constantlearner February 2016

duplicate message processed when polling files from s3

I am using s3 module to poll files from s3.It downloads the file to local system and starts processing it.I am running this on 3 node cluster with module count as 1.Now lets assume the file is downloaded to local system from s3 and xd is processing it.If xd node goes down it would have processed half the message.When the server comes up it will start processing file again hence I will get duplicate message.I am trying to change to idempotent pattern with message store to change the module count to 3 but still this duplicate message issues will be there.

  <int:poller fixed-delay="${fixedDelay}" default="true">
        <int:advice-chain>
                <ref bean="pollAdvise"/>

                </int:advice-chain>
        </int:poller>


        <bean id="pollAdvise" class="org.springframework.integration.scheduling.PollSkipAdvice">
            <constructor-arg ref="healthCheckStrategy"/>

        </bean>

        <bean id="healthCheckStrategy" class="ServiceHealthCheckPollSkipStrategy">
                     <property name="url" value="${url}"/>
                      <property name="doHealthCheck" value="${doHealthCheck}"/>
         </bean>



        <bean id="credentials" class="org.springframework.integration.aws.core.BasicAWSCredentials">
            <property name="accessKey" value="${accessKey}"/>
            <property name="secretKey" value="${secretKey}"/>
        </bean>



        <bean id="clientConfiguration" class="com.amazonaws.ClientConfiguration">
            <property name="proxyHost" value="${proxyHost}"/>
            <property name="proxyPort" value="${proxyPort}"/>
        <property name="preemptiveBasicProxyAuth" value="false"/> 
        </bean>


        <bean id="s3Operations" class="org.springframework.integration.aws.s3.core.CustomC1AmazonS3Operations">
            <constructor-arg index="        

Answers


Artem Bilan February 2016

Looks like it is continuation of the Multiple message processed, but unfortunately we don't see <idempotent-receiver> configuration in your case.

According to your comment there looks like you continue to use SimpleMetadataStore or clean the shared one (Redis/Mongo) very often.

You should share more info where to dig. Some logs and DEBUG investigation would be good, too.

UPDATE

The Idempotent Receiver is exactly for endpoint. In your config it is for the MessageChannel. That's why you don't achieve any proper work, because the MessageChannel is just ignored from the IdempotentReceiverInterceptor.

you should add an id for your <int-file:splitter> and use that id from the endpoint attribute. Not should if that would be good idea to use File as a key for idempotency. The name sounds better.

UPDATE 2

If a node goes down and lets assume a file is dowloaded(file size with million records may be gb) to xd node and I would have processed half the records and node crashes .When server comes up I think we will process same records again?

OK. I got your point finally! you have an issue with splitted lines from the file already.

Also I'd use Idempotent Receiver for the <splitter> as well to avoid duplicate files from S3.

To fix your use-case you should place in between <splitter> and output channel one more endpoint - <bridge> to skip duplicate lines with the Idempotent Receiver.

Post Status

Asked in February 2016
Viewed 2,134 times
Voted 5
Answered 1 times

Search




Leave an answer