Tuesday 21 April 2015

Apache Camel - AWS3 Integration - Moving files between buckets

Apache Camel is a great tool that helps when integrating heterogeneous systems. I'd say is the best one I've seen. By using Its comprehensive DSL, you can implement several integration patterns very easily. It also offers  well modularised APIs  ready to set and use and an extensive active community. On this post, I'll talk about one of these APIs, which is camel-aws3.
AWS3 is  a very popular storage service offered by Amazon and camel-aws3 is an API that let you manage files stored there. It is different than camel-file component where the idea is to deal with files stored in a given file system.
In order to use camel-aws3,  first thing we need to do is add It as dependency in our project.  Here, I'm using SBT, but it works on the same way for any other build/dependency/management tool.

libraryDependencies ++= Seq(
  "org.apache.camel" % "camel-core" % "2.15.0",
  "org.apache.camel" % "camel-aws" % "2.15.0",
  "commons-io" % "commons-io" % "2.4"
)

The lib commons-io (declared at line 4) isn't mandatory. I'm just using It to make my life easier when dealing with streams coming from AWS3.

Next thing to do is define a route. By defining a route, you tell camel to do things for you. Here I'm using the Java DSL, but you can achieve the same thing by using the  XML configuration support. Here is my route definition:

 
class MyRouteBuilder extends RouteBuilder { 
    override def configure(): Unit ={
        from("aws-s3://json-in?amazonS3Client=#client&maxMessagesPerPoll=15&delay=3000&region=sa-east-1").
         to("bean:fileProcessor").
         choice().
           when(header("status").isEqualTo("ok")).
            to("aws-s3://json-out?amazonS3Client=#client&region=sa-east-1").
           otherwise().
            to("aws-s3://json-error?amazonS3Client=#client&region=sa-east-1")
    }

}


Let's see what is going on here:
  • Defining the target:  on line 4, I'm setting from where files are coming from and how. Camel-aws3 API accepts parameters that specify how files will be fetched.  This is mostly what the rest of the parameters are doing. For more detail about those, check the website documentation.
  • Defining what happens after files are fetched - on line 5, I'm setting who is gonna process each file downloaded from AWS3. It's a simple scala class that is defined on the camel context. You'll more about this one in a sec.
  • Processing output results  - from line 6 until the end, I'm setting what is gonna happen based on the result from fileProcessor. This is the Control based router API acting. Here we have the logic that will move files from the origin bucket or another based on the output from fileProcessor.

The File Processor.


This is the implementation that will be called for each file arriving from AWS3. Nothing special about that. Ideally,this is where we need to apply any logic on the file being processed.
 
class FileProcessor extends Processor {
  import java.util.logging.Logger
  
  val logger = Logger.getLogger(classOf[FileProcessor].getName)

  override def process(msg: Exchange): Unit ={
    val content = msg.getIn.getBody(classOf[String])
    // Do Whatever you need with the content
    logger.info(content)
    Messenger.send(message = msg, status = Some("ok"))
  }

}


Actually, there is something specific happening on line 11. There, I'm sending content that will be used by the next step on the flow. The next step I'm referring here is the one currently implemented by MyRouterBuilder from line 6, that will decide whether it needs to move the file to a given bucket or another.

 
object Messenger {
  import org.apache.commons.io.IOUtils

  def send(message: Exchange, status: Option[String]): Unit ={
    message.getOut.setHeader("status", status.getOrElse("error"))
    message.getOut.setHeader(S3Constants.KEY,  message.getIn.getHeader(S3Constants.KEY).asInstanceOf[String])
    message.getOut.setHeader(S3Constants.BUCKET_NAME, message.getIn.getHeader(S3Constants.BUCKET_NAME))
    message.getOut.setBody(IOUtils.toInputStream(message.getIn.getBody(classOf[String])))
  }

}

By adding information on the output structure, the next step on the process will be able to decide what happens next based on the content. MessageSender isn't actually sending anything. It is just changing the Exchange object by reference. These names were picked just to make what is going on more explicitly to the user, once changing structures by reference makes the code a bit hard to understand.

 
object CamelContext {
  import com.amazonaws.auth.BasicAWSCredentials
  import com.amazonaws.services.s3.AmazonS3Client
  import org.apache.camel.main.Main

  val accessKey = ""
  val secretKey = ""

  def start(): Unit ={

    val camel  = new Main
    camel.bind("client", new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)))
    camel.bind("fileProcessor", new FileProcessor)
    camel.addRouteBuilder(new MyRouteBuilder)
    camel.enableHangupSupport()
    camel.run()
  }

}

Here I'm initialising the Camel context.  The client (line 13)  will be used for authorisation and the fileProcessor  (line 14)  will be used to process files. From this point, they can be referenced by any router (as MyRouterBuilder is doing).
Keep in mind you still need to define your own keys on lines 7 and 8.
From this point,  you just need to call the start method, like this:

 

object Main extends App {
  CamelContext.start()
}


That is It. If you want to look into details, a working example can be checked here.

No comments:

Post a Comment

Note: only a member of this blog may post a comment.