cancel
Showing results for 
Search instead for 
Did you mean: 

Aggregate last three values

0 Kudos

Hello,

I'm trying to identify extrema in a stream of stock data. I use a Window to get 1 record per symbol. Then i use two flex operators to join the last three values in a single record to compare the second value. Using a window and two flex operators seems rather inefficient. Am I missing something?

Best regards Jonas

Accepted Solutions (1)

Accepted Solutions (1)

0 Kudos

Hello Robert, thank you for your quick replies. Your answers helped me a lot finding my solution. I need to know, if it is a local extrema of the function representing the stockdata stream, so my solutions looks a bit different.

CREATE INPUT STREAM STOCKDATA SCHEMA (
	SYMBOL string ,
	DATE double ,
	VALUE double ,
	VOLUME double ) 
	 ;
	
CREATE OUTPUT WINDOW LastThree PRIMARY KEY DEDUCED 
AS
	SELECT
		STOCKDATA.SYMBOL SYMBOL ,
		STOCKDATA.DATE DATE ,
		STOCKDATA.VALUE VALUE ,
		STOCKDATA.VOLUME VOLUME ,
		last ( STOCKDATA.VALUE, 0 ) FIRST_VALUE ,
		last ( STOCKDATA.VALUE, 1 ) SECOND_VALUE,
		last ( STOCKDATA.VALUE, 2 ) LAST_VALUE 
		FROM STOCKDATA
		KEEP 3 ROWS PER (SYMBOL)
		GROUP BY STOCKDATA.SYMBOL 
	;

Is it possible to change the opcode inside a window or use the last() function or something similar inside a flex operator? The entries of the window following the LastThree window keep "disappearing" if i don t use a flex operator in between.

RobertWaywell
Product and Topic Expert
Product and Topic Expert
0 Kudos

What does the next element in your current project look like? Is it a stream or a window? Are you outputting the results to a database table or other persistent data store?

Answers (4)

Answers (4)

RobertWaywell
Product and Topic Expert
Product and Topic Expert

Here is a slightly different approach that only uses an input stream and one named window.

Note that the location of the KEEP policy statement for the Aggregate1 window is important. When a KEEP policy is specified in the FROM clause it is applied to the data coming from that source. In other words, in this example we are telling the Aggregate1 window to keep 3 records from the STOCKDATA stream for each distinct value of SYMBOL.

In the previous example the KEEP policy on the STOCKDATA input window told the STOCKDATA window to keep 3 records in that window for each distinct value of SYMBOL.

CREATE INPUT STREAM STOCKDATA SCHEMA 
(	TRADEID	INTEGER,
	SYMBOL string ,
	DATETIME SECONDDATE ,
	VALUE double ,
	VOLUME double ) ;
	
	
/**@SIMPLEQUERY=AGGREGATE*/
CREATE OUTPUT WINDOW Aggregate1 PRIMARY KEY DEDUCED
AS
	SELECT
		LAST(STOCKDATA.TRADEID)	LAST_TRADEID,
		STOCKDATA.SYMBOL SYMBOL ,
		LAST(STOCKDATA.DATETIME) LAST_DATETIME ,
		LAST(STOCKDATA.VALUE) CURRENT_VALUE ,
		MIN (STOCKDATA.VALUE) CURRENT_MIN_VALUE,
		MAX (STOCKDATA.VALUE) CURRENT_MAX_VALUE
	 FROM STOCKDATA 
	 	KEEP 3 ROWS PER ( SYMBOL) 
	 GROUP BY STOCKDATA.SYMBOL  ;
RobertWaywell
Product and Topic Expert
Product and Topic Expert

Sorry about the delay, it took me a few days to get back to this.

You don't need to use a flex element to calculate min or max values. You can use built in functions to calculate min and max in a single element and bring along the LAST() value in a single record:

/* Using an INPUT WINDOW lets you apply a KEEP policy to keep up to 3 records/rows per stock SYMBOL */

CREATE INPUT WINDOW STOCKDATA SCHEMA 
(	TRADEID	INTEGER,
	SYMBOL string ,
	DATETIME SECONDDATE ,
	VALUE double ,
	VOLUME double )
	PRIMARY KEY (TRADEID)
	KEEP 3 ROWS PER ( SYMBOL) ;
	
	
/**The OUTPUT window next in the flow is selecting data from the previous INPUT window. Since the STOCKDATA INPUT window is only keeping 3 records for seach SYMBOL, the SELECT in the Aggregate1 OUTPUT window is only seeing 3 records per symbol. 

The GROUP BY clause tells us to calculate the aggregates on a per SYMBOL basis. The MIN() and MAX() calculations are applied to the records currently available for each symbol. The LAST() function gives us the most recent values of the TRADEID, DATETIME, and VALUE for each symbol  */

CREATE OUTPUT WINDOW Aggregate1 PRIMARY KEY DEDUCED
KEEP 3 ROWS PER ( SYMBOL )
AS
	SELECT
		LAST(STOCKDATA.TRADEID)	LAST_TRADEID,
		STOCKDATA.SYMBOL SYMBOL ,
		LAST(STOCKDATA.DATETIME) LAST_DATETIME ,
		LAST(STOCKDATA.VALUE) CURRENT_VALUE ,
		MIN (STOCKDATA.VALUE) CURRENT_MIN_VALUE,
		MAX (STOCKDATA.VALUE) CURRENT_MAX_VALUE
	 FROM STOCKDATA GROUP BY STOCKDATA.SYMBOL ;
RobertWaywell
Product and Topic Expert
Product and Topic Expert

What comparison are you doing in your flex operators?

What aggregate or aggregates are you calculating?

Can you share either the CCL or some pseudocode showing the process you are trying to implement?

0 Kudos

I want to determine, if a value is higher or lower than its previous and its following value, to find maximums and minimums. This has to work with an arbitrary amount of Symbols, which the stream consits of, without changing the code. My problem is, i can t find a CCL function or structure to aggregate the values of the last three records per symbol in a single call. I use the previously described solution with a window and two flex operators. The following code shows the aggregation of the first two values. The second flex operator to aggregate the third value, looks almost the same as the first. I tried for loops and window iterators, but couldn't find a pleasing solution. Besides I'm worrying that for loops and window iterators could interfere with parallelization and performance.

CREATE INPUT STREAM STOCKDATA SCHEMA (SYMBOL string ,
	DATE string ,
	VALUE double ,
	VOLUME double ) ;

CREATE OUTPUT WINDOW SingleRecords PRIMARY KEY DEDUCED KEEP 1 ROWS PER ( SYMBOL ) as
	SELECT
		STOCKDATA.SYMBOL SYMBOL ,
		STOCKDATA.DATE DATE ,
		STOCKDATA.VALUE VALUE ,
		STOCKDATA.VOLUME VOLUME FROM STOCKDATA GROUP BY STOCKDATA.SYMBOL ;
		
CREATE FLEX LastTwoVals
IN SingleRecords 
OUT OUTPUT WINDOW LastTwoVals
SCHEMA (
 SYMBOL STRING ,
 NewVal DOUBLE,
 OldVal DOUBLE, 
 DATE STRING)
PRIMARY KEY (SYMBOL)
BEGIN
 ON SingleRecords  {
 if (isnull(SingleRecords_old)=0) {
 output setOpcode([
 SYMBOL = SingleRecords.SYMBOL; |
 NewVal = SingleRecords.VALUE;
 OldVal = SingleRecords_old.VALUE;
 DATE = SingleRecords.DATE;
 ],upsert);
 }
 } ;
END;