kalyan February 2016

Read file locations from table and copy to specific folder using pollEnrich()

I am trying to write a camel route that reads a database table to get the list of absolute file paths and then copy those files to another folder. However only the file path is created as content instead of the original content.

    from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionSql + "?dataSource=dataSource")
    .split(body())
    .to("file://" + positionlistDir )
    .log("Finished copying the list of Files.")

Please let me know what am i missing here to convert an absolute file path to an actual file.

Update #1.

Below snippet is invoking the pollEnrich(). But instead the pollEnrich() is copying the no of files which is equal to the no of rows returned by the sql and not according to the file name from the previous exchange.

        String positionListSqlOptions = "?dataSource=dataSource";
//      String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${header.positionFileToBeCopied}";
        String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${body}";
        String positionStagingDirOptionsForWriting = "?doneFileName=${file:name}.DONE";

        from("timer://testDataGen?repeatCount=1")
        .to("sql:" + positionListSql + positionListSqlOptions)
        .split(body())
        \\ Getting the column value from the resultset which is a LinkedCaseInsensitiveMap and storing in the body
        .process(new positionFeederProcessor()) 
        .setHeader("positionFileToBeCopied", body())
        .pollEnrich("file://" + positionSrcDir + positionSrcDirOptions)
//      .pollEnrich().simple("file://" + positionSrcDir + positionSrcDirOptions)
        .to("file://" + positionStagingDir + positionStagingDirOptionsForWriting)
        .log("Finished copying the list of Files.");

I am still unable to get the actual file name passed to the pollingEnrich() endpo

Answers


Jérémie B February 2016

the file component, when used on a to definition, produce a file with the content of the exchange, it doesn't read a file. you can use for example a pollEnrich processor :

from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionSql + "?dataSource=dataSource")
    .split(body())
    .pollEnrich().simple("file:folder?fileName=${body}")
    .to("file://" + positionlistDir )
    .log("Finished copying the list of Files.")


kalyan February 2016

Well, Finally I was able to do this without using pollEnrich() at all.

    String positionListSqlOptions = "?dataSource=dataSource";
    String positionSrcDirOptions = "?noop=true&delay=500&readLockMarkerFile=false&fileName=${header.CamelFileName}";
    String positionStagingDirOptionsForWriting = "?fileName=${header.position.file.name}&doneFileName=${file:name}.DONE";

    from("timer://testDataGen?repeatCount=1")
    .to("sql:" + positionListSql + positionListSqlOptions)
    .routeId("Copier:")
    .setHeader("positionFileList", body())
    .log("Creating the list of position Files ...")
    .split(body())
    .process(new PositionListProcessor())
    .setHeader("position.file.name", body())
    .setHeader("position.dir.name", constant(positionSrcDir))
    .process(new PositionFileProcessor())
    .choice()
        .when(body().isNull())
            .log("Position File not found. ${header.position.file.name}")
        .otherwise()
            .to("file://" + positionStagingDir + positionStagingDirOptionsForWriting)
            .log("Position File Copied from Src to : " + "${header.CamelFileNameProduced} ... ${headers} ...");

And here are the processors.

public class PositionListProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
    LinkedCaseInsensitiveMap positionFilesResultSet = (LinkedCaseInsensitiveMap) exchange.getIn().getBody();
    try {
        String positionFileStr = positionFilesResultSet.get("PF_LOCATION_NEW").toString();
        }
        exchange.getOut().setBody(positionFileStr.trim());
    } catch (Exception e) {     }
} }




public class PositionFileProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
    String filename = exchange.getIn().getBody(String.class);
    String filePath = exchange.getIn().getHeader("position.dir.name", String.class);
    URI uri = new URI("file:///".concat(filePath.concat( 

Post Status

Asked in February 2016
Viewed 1,302 times
Voted 11
Answered 2 times

Search




Leave an answer