Apache Camel + Karaf

Cake… just cake…
While contracting over at AT&T I created a dynamically deployed cloud friendly (read auto scaling group) for apache camel & routes.
I used JAXB (not a fan) to bind the RouteBuilder object via Spring DSL defined route xml into the Camel Context.

http://camel.apache.org/karaf.html

Here’s the groovy class (Grails Service):

package nimbus

import nimbus.util.NimbusTemplateEngine
import org.apache.camel.impl.DefaultCamelContext
import org.springframework.beans.BeansException;
import org.springframework.context.support.AbstractApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import org.springframework.core.io.FileSystemResource
import org.springframework.core.io.support.PathMatchingResourcePatternResolver
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.apache.camel.*
import org.apache.camel.model.*
import org.apache.camel.builder.*
import org.apache.camel.model.config.*
import org.apache.camel.model.dataformat.*
import org.apache.camel.model.language.*
import org.apache.camel.model.loadbalancer.*
import org.codehaus.groovy.grails.commons.spring.GrailsApplicationContext

import grails.spring.BeanBuilder
import groovy.text.SimpleTemplateEngine
import javax.xml.bind.JAXBContext
import javax.xml.bind.JAXBException
import javax.xml.bind.Unmarshaller

import com.hazelcast.core.MessageListener
import com.hazelcast.core.Message;

/**
* ComputeService – Wrapper for Apache Camel
*

* Apache Camel is a powerful open source integration framework based on known Enterprise
* Integration Patterns with powerful Bean Integration.
* Camel lets you create the Enterprise Integration Patterns to implement routing and mediation
* rules in either a Java based Domain Specific Language (or Fluent API), via Spring based Xml
* Configuration files or via the Scala DSL. This means you get smart completion of routing rules
* in your IDE whether in your Java, Scala or XML editor.
* Apache Camel uses URIs so that it can easily work directly with any kind of Transport or
* messaging model such as HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF Bus API together with
* working with pluggable Data Format options. Apache Camel is a small library which has minimal
* dependencies for easy embedding in any Java application. Apache Camel lets you work with the
* same API regardless which kind of Transport used, so learn the API once and you will be able
* to interact with all the Components that is provided out-of-the-box.
*

* @see Camel
*
* @author Terry Walters
*
* @since 1.0
*/
class ComputeService implements MessageListener, ApplicationContextAware {

def eelService
def deferoService
def restletComponentService
def static NAMESPACE = “nimbus”
def static SERVICENAME = “ComputeService”
def static TOPIC_NAME = NAMESPACE+”.”+SERVICENAME+”.topic”
def static ADMIN_TOPIC = NAMESPACE+”.admin.topic”
def static EEL_CLASS = “grails.app.services.nimbus.ComputeService”
def static EEL_CODE_GENERAL = 1
def static USER_JAR_DIR = ‘lib’
def static USER_BEAN_DEF_LOCATION = “conf”

def grailsApplication
def adaptorRouterService
def restletComponent
def started = false
def initialized = false
def registeredListener=false

//Get ApplicationContext from Grails
def appCtx

//create camel-1 context to be used throughout this application
CamelContext ctx

// Camel template – a handy class for kicking off exchanges
ProducerTemplate template

static transactional = true
static expose = [‘jmx’]

public void onMessage(Message msg) {
def EEL_METHOD = “onMessage”
def messageObject = msg.getMessageObject()
//println “ComputeService::onMessage msg: ${msg}, messageObject: ${messageObject}”

switch(messageObject){
case “init”:
init()
break

case “start”:
start()
break

case “stop”:
stop()
break

case “shutdown”:
shutdown()
break
case ComputeRoute:
ComputeRoute cr = (ComputeRoute)messageObject
//println “\t\t${cr.routeName} version: ${cr.routeVersion}”
if(!initialized || !started) {
//println “Attempt to insert or update route: ${cr.namespace}.${cr.routeName}.${cr.routeVersion} before ComputeService has been successfully started and initialized”
return
}
addRoute(cr)
//cr.save()
//cr.cacheSave()
break

case GString:
def command = (String)messageObject
//println “command: ${command}”
switch(command){

case “init”:
init()
break

case “start”:
start()
break

case “stop”:
stop()
break

case “shutdown”:
shutdown()
break

case “listRoutes”:
// for every route on this server
// add the route status to a map
// using this servers address as the key
def self
deferoService.getMembers().each{member->
if(member.self)
self = member
}
listRoutes().each{rid->
def rs = getRouteStatus(rid)
def str = “${rid}.${rs}”
deferoService.getMap(“nimbus.cluster.${self.address}.${self.port}.routes”).put(str)
}
break

case ~/^delete .*/:
// “delete namespace.name.version”
// get the route name to be deleted
def routeName = command.split()[1]
if(!initialized || !started) {
//println “Attempt to delete route: ${routeName} before ComputeService has been successfully started and initialized”
break
}
if(routeName)
delRoute(routeName)
break

case ~/^start .*/:
def routeName = command.split()[1]
if(routeName)
startRoute(routeName)
break

case ~/^stop .*/:
def routeName = command.split()[1]
if(routeName)
stopRoute(routeName)
break

default:
//println “Unknown command: ${command}”
break
}
break

case “listRoutes”:
// for every route on this server
// add the route status to a map
// using this servers address as the key
def self
deferoService.getMembers().each{member->
//println member
if(member.localMember())
self = member
}
listRoutes().each{rid->
def rs = getRouteStatus(rid)
//println “listRoutes.${rid}.${rs}”
deferoService.getMap(“nimbus.cluster.${self.getInetAddress()}.routes”).put(self.getInetAddress(),rs)
}
break

case ~/^delete .*/:
// “delete namespace.name.version”
// get the route name to be deleted
def routeName = messageObject.split()[1]
if(!initialized || !started) {
//println “Attempt to delete route: ${routeName} before ComputeService has been successfully started and initialized”
break
}
if(routeName)
delRoute(routeName)
break

default:
println “\t\tUnknown message ${messageObject}”
}

}

def suspend() {
ctx.suspend()
}
def resume() {
ctx.resume()
}

def start = {
def EEL_METHOD = “start”

if(started) return

started=true
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “ComputeService:start” )

//Restlet URI matching set default to EQUALS
restletComponent.defaultHost.setDefaultMatchingMode(org.restlet.routing.Template.MODE_EQUALS);

// Load user defined beans into context
// Load jars from NIMBUS_HOME/lib
def nimbusHome = System.getProperty(‘NIMBUS_HOME’) ?: System.getenv(“NIMBUS_HOME”)
if (!nimbusHome) eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity: “error”, info: “NIMBUS_HOME is null, user defined beans will not be loaded” )
def loader = URLClassLoader.newInstance(appCtx.getClassLoader().getURLs(), appCtx.getClassLoader())
if (!loader) {
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity: “error”, info: “Could not get classloader, user defined beans will not be loaded” )
} else {
UserDefinedJar.list().each { userDefinedJar ->
try {
def jardir = new File(“${nimbusHome}/${USER_JAR_DIR}/${userDefinedJar.namespace}/${userDefinedJar.jarVersion}”)
def jar = jardir.listFiles().find { it.name == userDefinedJar.jarName }
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “ComputeService loading jar: ${jar} type: ${jar.class.name}” )
loader.addURL(jar.toURI().toURL())
} catch (all) {
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity: “error”, info: “Failed to jar file: ${nimbusHome}/${USER_JAR_DIR}/${userDefinedJar.namespace}/${userDefinedJar.jarVersion}”, exception: all )
}
}
}
// Load beans
def bb = new BeanBuilder(appCtx, loader)
def beansLocation = “${nimbusHome}/${USER_BEAN_DEF_LOCATION}”
def beansDefResource = null
UserDefinedBeansDef.list().each { beansDef ->
try {
def beansDefFile = new File(“${beansLocation}/${beansDef.namespace}/${beansDef.beansDefVersion}/${beansDef.beansDefName}${!beansDef.beansDefName.endsWith(‘.groovy’) ? ‘.groovy’ : ”}”)
beansDefResource = new FileSystemResource(beansDefFile)
bb.loadBeans(beansDefResource)
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Loaded user defined beans: ${beansDefResource}” )
} catch (all) {
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity: “error”, info: “Failed to load bean definitions from: ${beansLocation}/${beansDef.namespace}/${beansDef.beansDefVersion}/${beansDef.beansDefName}”, exception: all )
}
}
def bbAppCtx
try {
bbAppCtx = bb.createApplicationContext()
// Add user defined beans to appCtx for easier access from scripts within route
bb.getBeanDefinitions().each { k, v ->
appCtx.registerBeanDefinition(k, v)
}
} catch (all) {
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity: “error”, info: “Failed to load bean definitions, routes will not have access to any User Defined Beans!”, exception: all )
// Use appCtx if BeanBuilder fails
bbAppCtx = appCtx
}

// Create CamelContext to be used throughout this application
if (ctx) {
ctx.destroy()
}
ctx = new org.apache.camel.spring.SpringCamelContext(bbAppCtx)

// Camel template – a handy class for kicking off exchanges
template = ctx.createProducerTemplate();
ctx.start()
restletComponentService.stop()
restletComponentService.setCamelContext(ctx)
restletComponentService.start()
ctx.addComponent(“restlet”, restletComponentService)
//println “ComputeService::Started”

}

def stop = {
def EEL_METHOD = “stop”

if(!started) return

//println “ComputeService::stop”
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “ComputeService:stop” )
ctx.getShutdownStrategy().shutdown()
initialized = false
started = false
}

/**
*

getRouteStatus – returns the status for the given route id

*
* @params nimbus.ComputeRoute
* @see also sa.CamelService
* @returns
*/
def getRouteStatus(rid) {
return ctx.getRouteStatus(rid)
}

/**
*

listRoutes – list all Route IDs

*
* @params nimbus.ComputeRoute
* @see also sa.CamelService
* @returns
*/
def listRoutes ={
def res = []
ctx.getRoutes().each{route-> res.add(route.getId())}
return res
}
/**
* Validate that the endpoint(s) in the given route are
* unique. If a matching endpoint
* is found in another route, return its route id. If no match is found,
* return null
* @param cr ComputeRoute
* @return the route id of the route with matching endpoint or null
*/
def validateFromEndpointUnique(cr) {
def returnVal = null
def unique = false
RouteDefinition route = getRouteDefinition(cr)

// endpoints Map
def endpoints = getAllEndpoints()

List inputs = route.getInputs()
for (FromDefinition from : inputs) {
for (Object endpoint : endpoints.keySet()) {
// Ignore endpoints for the current route, but if an endpoint from another route matches, we have a duplicate
if (endpoints.get(endpoint) != route.getId() && endpoint == from.getUriOrRef()) {
returnVal = endpoints.get(endpoint)
break
}
}
}
return returnVal
}

/**
* Determine if the given route has any input () endpoints
* that already exist in any other routes
* @param route
* @return true if all input endpoints are unique, false otherwise
*/
def isUniqueEndpoints(route) {
def returnVal = true

// endpoints Map
def endpoints = getAllEndpoints()

List inputs = route.getInputs()
for (FromDefinition from : inputs) {
for (Object endpoint : endpoints.keySet()) {
// Ignore endpoints for the current route, but if an endpoint from another route matches, we have a duplicate
if (endpoints.get(endpoint) != route.getId() && endpoint == from.getUriOrRef()) {
returnVal = false
break
}
}
}
return returnVal
}

/**
* Return a Map of all endpoints and their corresponding routeId from routes defined in all contexts
* @return Map
*/
def getAllEndpoints = {
def endpoints = new HashMap()
ctx.getRouteDefinitions().each { rd ->
rd.getInputs().each { from ->
endpoints.put(from.getUriOrRef(), rd.getId())
}
}
return endpoints
}

def getRouteDefinition(cr) {
def EEL_METHOD = “getRouteDefinition”
RouteDefinition route
JAXBContext context = JAXBContext.newInstance(“org.apache.camel:org.apache.camel.model:org.apache.camel.model.config:org.apache.camel.model.dataformat:org.apache.camel.model.language:org.apache.camel.model.loadbalancer”)
Unmarshaller unmarshaller = context.createUnmarshaller()
Object value = unmarshaller.unmarshal(new StringReader(propertyReplace(cr.routeDefinition)))
if (value instanceof RouteDefinition) {
route = (RouteDefinition)value
route.routeId(cr.namespace+”.”+cr.routeName+”.”+cr.routeVersion)
} else {
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”Failed to unmarshall route definition for ${cr?.generateId()} ${cr.routeDefinition}”, severity: “error”)
}
return route
}
/**
*

addRoute – creates a Route dynamically from the provided XML

*
* @params sa.ServiceMethodMap
* @see also sa.CamelService
* @returns
*/
def addRoute(cr) {
def EEL_METHOD = “addRoute”
int EEL_CODE_GENERAL = 1
int EEL_CODE_ADD_ROUTE_FAILURE = 2

try{
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “addRoute: adding route: ${cr.namespace}.${cr.routeName}.${cr.routeVersion}” )
if (cr) {
RouteDefinition route = getRouteDefinition(cr)
def routeName = cr.namespace+”.”+cr.routeName+”.”+cr.routeVersion

// Verify the endpoint does not already exist
if (!isUniqueEndpoints(route)) {
//println “Route ${routeName} contains endpoint that already exists, route will not be added/started in Camel”
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Route ${routeName} contains endpoint that already exists, route will not be added/started in Camel” )
} else {
// add or update route in running camel context
ctx.stopRoute(routeName)
ctx.removeRoute(routeName)
if(cr.deployStatus==”Active”)
ctx.addRouteDefinitions(Collections.singletonList(route));
try{
def status = ctx.getRouteStatus(routeName)
def msg = “${routeName} status: ${status}”
//println “\t\t${msg}”
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: msg )
}catch(all){
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”addRoute for ${cr.routeName} failed, error getting route status”, severity: “error”, exception: all )
println all.getMessage()
}
}
} else{
if (!cr) {
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”addRoute failed with ${value}”, severity: “error”)
} else {
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”addRoute ${cr.routeName} failed with ${value}”, severity: “error”)
}
println “addRoute failed”
}
}catch(Throwable t) {
println “addRoute for ${cr?.routeName} failed”
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”addRoute for ${cr?.routeName} failed”, severity: “error”, exception: t )
}
}

/**
*

delRoutes – deletes a Route dynamically based on the name passed

*
* @params nimbus.ComputeRoute
* @see also sa.CamelService
* @returns
*/
def delRoute(routeName) {
def EEL_METHOD = “delRoute”
int EEL_CODE_GENERAL = 1

try{
if (eelService?.isInfo())
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “deleting route: ${routeName}” )
RouteDefinition route = ctx.getRouteDefinition(routeName)
if (route) {
ctx.removeRouteDefinitions(Collections.singletonList(route))
}
}catch(all){
println “Failed to delete route ${routeName}”
println all.getMessage()
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”Failed”, severity: “error”, exception: all )
}
}

/**
*

init – Initialize this service:

*

      *

    • starts routes

*

  • subscribes to Admin messages

*

  • subscribes to Service messages

*

* @params nimbus.ComputeRoute
* @see also sa.CamelService
* @returns
*/
def init = {
start()
def EEL_METHOD = “init”
int EEL_CODE_GENERAL = 1

//println “Initializing ComputeService”
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Initializing ComputeService” )

if(!registeredListener){
deferoService.addTopicListener(TOPIC_NAME, this)
deferoService.addTopicListener(ADMIN_TOPIC, this)
registeredListener=true
}

try{
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Adding routes” )
def routeList = ComputeRoute.list()
if (!routeList) {
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “No routes to add” )
} else {
routeList.each{
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Adding route: ${it.namespace}.${it.routeName}.${it.routeVersion}” )
addRoute(it)
}
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “Adding routes completed” )
}
initialized=true
println “ComputeService initialized sucessfully”
if (eelService?.isInfo())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, info: “ComputeService initialized sucessfully” )
}catch(all){
all.printStackTrace()
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”ComputeService failed to initialize”, severity: “error”, exception: all )
}

}

def shutdown = {
stop()
}

def propertyReplace(input) {
def EEL_METHOD = “propertyReplace”
int EEL_CODE_GENERAL = 1
def output

try {
if (eelService?.isDebug())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity:”debug”, info:”Replacing environment specific values in route”)
output = NimbusTemplateEngine.propertyReplace(input, System.properties)
} catch (all) {
eelService?.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, code:EEL_CODE_GENERAL, info:”Failed to replace environment specific values in route”, severity: “error”, exception: all )
output = input
}
if (eelService?.isDebug())
eelService.report(namespace: NAMESPACE, serviceName: SERVICENAME, name: EEL_CLASS, method: EEL_METHOD, severity:”debug”, info:”Output from propertyReplace: ${output}”)
return output
}

/**
*

getRouteDefinitions

*
* @params routeId
* @see
* @returns
*/
def getRouteDefinitions(){
return ctx.getRouteDefinitions()
}

/**
*

getEndpoints

*
* @params routeId
* @see
* @returns
*/
def getEndpoints(){
return ctx.getEndpoints()
}

/**
*

getEndpointsMap

*
* @params routeId
* @see
* @returns
*/
def getEndpointsMap(){
return ctx.getEndpointsMap()
}

/**
*

startRoute

*
* @params routeId
* @see
* @returns
*/
def startRoute(routeId) {
return ctx.startRoute(routeId)
}

/**
*

stopRoute

*
* @params routeId
* @see
* @returns
*/
def stopRoute(routeId){
return ctx.stopRoute(routeId)
}

/**
*

removeRoute

*
* @params routeId
* @see
* @returns
*/
def removeRoute(routeId){
return ctx.removeRoute(routeId)
}

/**
*

addRoutes

*
* @params routeBuilder
* @see
* @returns
*/
def addRoutes(routeBuilder){
return ctx.addRoutes(routeId)
}

/**
*

suspendRoute

*
* @params routeId
* @see
* @returns
*/
def suspendRoute(routeId){
return ctx.suspendRoute(routeId)
}

/**
*

resumeRoute

*
* @params routeId
* @see
* @returns
*/
def resumeRoute(routeId){
return ctx.resumeRoute(routeId)
}

@Override
public void setApplicationContext(ApplicationContext appCtx) throws BeansException {
this.appCtx = appCtx
}
}

Advertisements