Skip to Content
author's profile photo Former Member
Former Member

Update opcode rolling back all records in bulk

Hi,


In our project we are reading in events on a stream called PRF_Result_Linkstream then putting them through a flex window and trying to update a record if it is in the window with the code below. Not all records coming in on the stream are in the window though.


OUTPUT setOpcode(PRF_Result_Linkstream,update);


Unfortunately in seems with the update opcode that if the record is not in the window it forces the roll back of that event plus any events that came in around the same time as the message below shows.


2013-05-08 04:47:18.232 | 453 | container | [SP-4-123006] (3366.905) sp(2488) StoreIndex(Master_Window)::put() -- roll back transaction of size 10

2013-05-08 04:47:18.960 | 453 | container | [SP-4-123002] (3367.633) sp(2488) StoreIndex(Master_Window)::put() bad update, tid=9061


We had the same issue with the delete and managed to get around this using the safedelete opcode. Is there something similar for update? (note we do not wish to upsert, if the record is not in the window we would like to ignore it). Is there a way to stop the records being batched up, so they can be treated individually?


Thanks very much,

Mark Wyatt (DHS Australia).

Add a comment
10|10000 characters needed characters exceeded

Related questions

1 Answer

  • Posted on May 15, 2013 at 09:01 PM

    Hi Mark,

    Here is a way to do it
    ===================================
    /*Updates coming through this stream */
    CREATE INPUT STREAM s1 schema (symbol string,volume integer,price float);


    /*break the batching using a flex stream and a dictionary this will output 1 row at a time */
    CREATE FLEX flex1 IN s1 OUT OUTPUT STREAM break_batch SCHEMA(symbol string,volume integer, price float)
    BEGIN
    DECLARE
    dictionary(long,typeof(s1)) dict;
    typeof(s1) record;
    long id:=1;
    long fill_id:=1;
    END;
    on s1
    {
    dict[s1.ROWID]:=s1;
    print('inside dictionary',s1.symbol);
    fill_id++;

    };
    every 1 second
    {
    if(fill_id=id)
    {
    exit;
    }
    else
    {
    record:=dict[id];
    print('inside every 1 second',record.symbol);
    output setOpcode(record,insert);
    remove(dict,id);
    id++;

    }

    };
    END;

    /*Initial value of the window gets filled up through this stream */
    CREATE INPUT STREAM fill_window schema(symbol string,volume integer,price float);

    CREATE FLEX flex2 in break_batch,fill_window out output window W1 SCHEMA(symbol string,volume integer, price float)
    primary key(symbol)
    BEGIN
    on break_batch
    {
    output setOpcode(break_batch,update);
    };
    on fill_window
    {
    output setOpcode(fill_window,insert);
    };
    END;
    ====================================
    Send in rows in to fill_window stream first using Manual Input->Transaction mode
    AAA,10,10
    BBB,10,10
    CCC,10,10
    DDD,10,10

    Send in rows in to s1 using Manul Input->Transaction mode
    AAA,1,1
    BBB,1,1
    EEE,1,1

    You will see 2 transactions getting updated and 1 getting thrown out.


    Please note, this can slow down the system as I am breaking it and sending 1 row at a time every 1 second. Also I have just run some simple test cases with it and have never implemented
    this in a large project. I have a message from Jason and I understand your system to be quite complicated. I am not sure how this will work with heavy load.

    If you would like to ask more questions on this and would like more information on this then please open a new message so I can work with you on your issue.
    Thanks,
    Geetha.

    Add a comment
    10|10000 characters needed characters exceeded

    • Good feedback - thanks. I've logged it as an enhancement request. Not sure how soon we'll get to it - drop me a note if you see it becoming a pressing need. And I've also put in a request to get an explanation of this added ot the documentation (I think I'll also post a little write up explaining this here in the community).

Before answering

You should only submit an answer when you are proposing a solution to the poster's problem. If you want the poster to clarify the question or provide more information, please leave a comment instead, requesting additional details. When answering, please include specifics, such as step-by-step instructions, context for the solution, and links to useful resources. Also, please make sure that you answer complies with our Rules of Engagement.
You must be Logged in to submit an answer.

Up to 10 attachments (including images) can be used with a maximum of 1.0 MB each and 10.5 MB total.