/ Code author: Yunyue Zhu, 2001-2002. / Joint work with Dennis Shasha. / stat.k / k stat [+h host] [+p port] [+out1 file1] [+out2 file2] [+l sliding window length] / [+f frequency to update] [+m] [+a] [+b] [+s] [+inter|intra] [+t threshold] [+d minimum duration] \l db /TIME TESTING HARNESS (REMOVE / if you want this) /\l time //communication parameters /the filename for single stream statistics output outfile1:"./output1" /the filename for sterams correlation statistics output outfile2:"./output2" /the default server host and port host:` port:2000 //basic parameters /the length of the basic window, number of points bw: 32 /the length of the sliding window, number of points sw: 64 RECROOTSW :1%_sqrt sw threshold: 0.90 duration:3 //flags flagCategory: 0 flagBfs: 1 /if this flag is set, we will compute the BFS(best fit slope). flagAutocorr: 0 /if this flag is set, we will compute the Autocorrelation(at lag 1). flagMinMax : 1 /if this flag is set, we will compute the Min Max statistics flagBeta : 0 /if this flag is set, we will compute the Beta flagDft : 1 /if this flag is set, we will compute the correlation with DFT coefficients. flagPrecise: 0 /if this flag is set, we will compute the correlation precisely. flagHash: 1 /-------------------------------------------------------------------------------------------------------------------- / functions /-------------------------------------------------------------------------------------------------------------------- //input args processing processargs:{[args] okflag: 0 i: 0 while[(i < #args) & ~okflag=-1 okflag:: -1 if[args[i] ~ "+out1";outfile1:: args[i+1];okflag:: 1] if[args[i] ~ "+out2";outfile2:: args[i+1];okflag:: 1] if[args[i] ~ "+h";host:: args[i+1];okflag:: 1] if[args[i] ~ "+p";port:: 0 $ args[i+1];okflag:: 1] if[args[i] ~ "+l";sw:: 0 $ args[i+1];okflag:: 1] if[args[i] ~ "+f";bw:: 0 $ args[i+1];okflag:: 1] if[args[i] ~ "+t";threshold:: 0 $ args[i+1];okflag:: 1] if[args[i] ~ "+d";duration:: 0 $ args[i+1];okflag:: 1] if[args[i] ~ "+m";flagMinMax:: 1;okflag:: 0] if[args[i] ~ "+a";flagAutoCorr:: 1;okflag:: 0] if[args[i] ~ "+s";flagBfs:: 1;okflag:: 0] if[args[i] ~ "+b";flagBeta:: 1;okflag:: 0] if[args[i] ~ "+intra";flagCategory:: 1;okflag:: 0] if[args[i] ~ "+inter";flagCategory:: 2;okflag:: 0] i+: 1 + okflag if[-1 = okflag s : "format is k stat [+h host] [+p port] [+out1 file1] [+out2 file2]" s,: "[+l sliding window length] [+f frequence to update] [+m] [+a] [+b]" s,: "[+inter|intra] [+t threshold] [+d minimum duration] \n" ` 0: s ] ] } //standard statistics functions /average avg:{(+/ x) % # x} var:{avg[_sqr x] - _sqr avg[x]} std:{_sqrt var[x]} cov:{avg[x * y] - avg[x] * avg[y]} corr:{ (cov[x;y])%((std[x]) * (std[y]))} //Discret Fourier Transform in one line dft:{[x]:((+/x*COS),(+/x*SIN))*2%bw} dfthash:{[x] /x: the data /return the 1 to m coefficient components :(+/REAL*x),(+/IMAGE*x) } batchDigestUpdate:{[digest;x] //x: 1*bw vecotr, the incoming time series within a basic window in batch, bw points //return: new digest after the next basic windows is update with new values x digest.timepoint+:bw i:digest.ptr1 :[i=nb;next:0;next:i+1] digest[`windows;i;`mean] : (+/x)%bw digest[`windows;i;`sum2] : +/x*x if[flagBfs;digest[`windows;i;`bfs]:+/x*!bw] if[flagAutocorr digest[`windows;i;`aut] : +/*':(digest[`windows;i;`lastx],x) digest[`windows;next;`lastx]:*|x ] if[flagDft; digest[`windows;i;`dftcoef] : dft[x]] if[flagHash;digest[`windows;i;`dfthash] : dfthash[x]] if[flagPrecise;digest[`windows;i;`series] : x] if[flagMinMax; digest[`windows;i;`min]:&/x;digest[`windows;i;`max]:|/x] digest.sum +: bw*digest[`windows;i;`mean]-digest[`windows;next;`mean] digest.sum2 +: digest[`windows;i;`sum2]-digest[`windows;next;`sum2] :[digest.timepointsw+1; digest.bfs0+:(nb*bw*bw*digest[`windows;i;`mean])+digest[`windows;i;`bfs]-digest[`windows;next;`bfs]+bw*digest.sum] ] if[flagHash digest.dfthash : ((digest.dfthash[!nc]*PHASE1)-(digest.dfthash[nc+!nc]*PHASE2)),((digest.dfthash[!nc]*PHASE2)+(digest.dfthash[nc+!nc]*PHASE1)) digest.dfthash+: digest[`windows;i;`dfthash]-digest[`windows;next;`dfthash] digest.dftnorm: digest.dfthash%digest.std*sw ] digest.ptr1:next :digest } prodUpdate:{[D1;D2;prod] old:D1.ptr1 :[old=0;new:nb;new:old-1] if[flagDft if[h=nb-1 :((+//D1.windows[;`dftcoef]*D2.windows[;`dftcoef])%2)++/D1.windows[;`mean]*D2.windows[;`mean] ] prod-:(0.5*+/D1.windows[old;`dftcoef]*D2.windows[old;`dftcoef])+D1.windows[old;`mean]*D2.windows[old;`mean] prod+:(0.5*+/D1.windows[new;`dftcoef]*D2.windows[new;`dftcoef])+D1.windows[new;`mean]*D2.windows[new;`mean] ] if[flagPrecise if[h=nb-1 :+/D1.windows[;`series]*D2.windows[;`series] ] prod-:+/D1.windows[old;`series]*D2.windows[old;`series] prod+:+/D1.windows[new;`series]*D2.windows[new;`series] ] :prod } correlation:{ [D1;D2;prod] //computing the correlation coeficients given the digest of two series x and y of digests D1 and D2 cov: (prod%nb)-(D1.mean*D2.mean) :cov%(D1.std*D2.std) } corrdft:{ [D1;D2] //computing the correlation coeficients given the digest of two series x and y of digests D1 and D2 index: 1 _ (D1.ptr1+!(nb+1))!nb+1 cov: ((((+//D1.windows[index;`dftcoef]*D2.windows[index;`dftcoef])%2)+(+/D1.windows[index;`mean]*D2.windows[index;`meam]))%nb)-(D1.mean*D2.mean) :cov%(D1.std*D2.std) } corrshort:{ [D1;D2;b] //computing the correlation coeficients of shorter sliding window of length b index: 1 _ (D1.ptr1+!(b+1))!b+1 D1mean: (+/D1.windows[index;`mean])%b D2mean: (+/D2.windows[index;`mean])%b cov: ((((+//D1.windows[index;`dftcoef]*D2.windows[index;`dftcoef])%2)+(+/D1.windows[index;`mean]*D2.windows[index;`mean]))%b)-(D1mean*D2mean) :cov%(D1.std*D2.std) } betadft:{ [D1;D2] //computing the beta of x to y given their digests D1,D2 :[D1.timepointsw); s,:",",($(digest.bfs0-digest.mean*bfs1)%bfs2)] if[flagBfs&(digest.timepoint0;a:(x-1),x;a:x] if[xthreshold :[timestamps[index;j]=h-1 durations[index;j]+:1 durations[index;j]:1 ] timestamps[index;j]:h if[~durations[index;j]5)&(flagCategory=0) flagHash:1 flagDft:0 nc : 6 if[nc >bw; nc=bw] ] if[flagDft //in th ecase when we are not use the grid-hash algorithms flagHash : 0 tolerance : 0.03 threshold-:tolerance nc:(_ bw%64)- _ nb%10 if[nc<3;nc:3] ] nn: nc*2 /the number of DFT coeffients (nn/2 for cose and sine each) //get newdata t: handle 4: "select count($) from newdata" m: _ t[1;0]%(bw*n_stream) /the whole length of stream //processing of categories stat.i:0;stat.j:0;stat.prod:0;stat.dur:0;//stat.delay:0 pairs:(-n_stream+n_stream*n_stream) # stat cat: handle 4: "select from category" k:0;i:0 do[n_stream-1 j:i+1 do[-1+n_stream-i if[flagCategory=0;pairs[k;`i]:i;pairs[k;`j]:j;k+:1] if[flagCategory=1 //within categories if[cat[1;1;i]=cat[1;1;j] pairs[k;`i]:i pairs[k;`j]:j k+:1 ] ] if[flagCategory=2 //between categories //if[~category.cid[i]=category.cid[j];pairs[k;`i]:i;pairs[k;`j]:j;k+:1] if[~cat[1;1;i]=cat[1;1;j] pairs[k;`i]:i pairs[k;`j]:j k+:1 ] ] j+:1 ] i+:1 ] pairs:pairs[!k] //the data structure for maintain duration information /given a pair (i,j) (i=1..ns,j=0..i-1) get the duration or timestamp be A[i;j] durations:n_stream # () i:0 do[n_stream durations[i]:i#0 i:i+1 ] timestamps:durations //digest data structure /basic window digests basicwin.mean:0 /mean(xi) within basic window basicwin.sum2:0 /sum(xi*xi) within basic window if[flagDft; basicwin.dftcoef: nn#0] /list of DFT coeffients, start with /nn/2 cosine coeffients and the nn/2 sine coeffients if[flagHash; basicwin.dfthash: nn#0] /DFT coeffients components if[flagPrecise; basicwin.series: bw#0] /the original time series if[flagMinMax; basicwin.min:0;basicwin.max:0] /the min and max if[flagBfs; basicwin.bfs:0] /for the computation of BFS,sum(xi*i)i=0,...,bw if[flagAutocorr;basicwin.aut:0;basicwin.lastx:0] /sum(xi*x_{i-1}), for the computing /of first order autocorrelation;lastx is the lastvalue in the last basic window /sliding window digests digest.windows: (nb+1) # basicwin /the digest of basic windows digest.sum: 0 /sum(xi) within sliding window digest.sum2: 0 /sum(xi*xi) within sliding window digest.mean: 0 /mean(xi) within sliding window digest.mean2: 0 /mean(xi*xi) within sliding window digest.std: 0 /standard deviation within sliding window if[flagHash digest.dfthash: nn#0 digest.dftnorm: nn#0 ] /DFT coeffients for the whole sliding window if[flagBfs;digest.bfs0:0] /sum(xi*i), for the computing of BFS(best fit slope) digest.timepoint:1 /accumulating time point, no rotating digest.ptr1:0 /the window to be updated(the oldest window) digest.ptr2:0 /the position to be updated within basic window(the oldest data point) data: n_stream # digest /all the digests of the data streams, using rotating windows //output files hearder outs1:"Stream ID,BeginTime,EndTime,Average,Standard_Deviation" if[flagMinMax; outs1,:",Max,Min"] if[flagBfs; outs1,:",Best_Fit_Slope"] if[flagAutocorr;outs1,:",Autocorrelation"] if[flagBeta;outs1,:",Beta"] outs1,:"\n" outs2:"streamID1,streamID1,Begin Time Point,End Time Point,Correlation Coefficient,Duration\n" //Precompute before the stream come in t:!bw arg:2*3.1415926%bw p:1+!nc COS:_cos(arg*p*/:t) SIN:_sin(arg*p*/:t) if[flagBfs q:!sw bfs1:+/q //Sum(i),i=0,..,sw-1 bfs2:(+/q*q)-sw*((+/q)%sw)*((+/q)%sw) //Sum(i*i)-sw*mean(i)^2,i=0,..,sw-1 ] /for dfthash if[flagHash //for the digest computing mm:1+!nc t:bw-!bw arg:2*3.1415926%sw REAL : _cos(arg*mm*/:t) IMAGE: _sin(arg*mm*/:t) PHASE1: _cos(arg*mm*bw) PHASE2: _sin(arg*mm*bw) //for the grid structure unit:_sqrt(1-threshold) unit_r:1%unit whole:0.707 P:2*1+_ whole%unit; PP:_ P%2 R:3 //grid is P*P*...*P=P^R List:_ P^!R ] //MAIN PROGRAM "start..." start: _t //start time for all the computation start2: 0 //start time for steady stage computing pariwiseTime: 0 // the time need for pairwise correlation computation gridhashTime: 0 // the time need for Grid-Hash correlation report readpointer:0 last:1 count:0 total:0 h:0 do[m //computing digest index:0 last:(data[index;`timepoint]-1)-sw;if[last<1;last:1] do[n_stream //get new data s :"select value from newdata where $ within (",($readpointer),"," s, :($readpointer+(bw*n_stream)-1),"),streamID='",($ids[index]),"'" t: handle 4: s rw:t[1;0] //compute digest data[index]:batchDigestUpdate[data[index];rw] index+:1 ] readpointer+:bw*n_stream //output index:0 do[n_stream;outs1,:output[ids[index];data[index];beta[index]];index+:1] //computing correlation if[h>nb-2 if[h=nb;start2:_t] //BEGIN----pairwaise correlation computation if[~flagHash k:0 do[#pairs i:pairs[k;`i];j:pairs[k;`j] pairs[k;`prod]:prodUpdate[data[i];data[j];pairs[k;`prod]] corr:correlation[data[i];data[j];pairs[k;`prod]] //corr:corrdft[data[i];data[j]] if[(corr>threshold)|(corr<-threshold) s:,/("i=",$i,",j=",$j,",corr=",corr," Duration:",pairs[k;`dur]) count+:1;pairs[k;`dur]+:1; if[~pairs[k;`dur]-threshold) pairs[k;`dur]:0 ] total+:1 k+:1 ] pariwiseTime:_t-start2 ] //END----pairwaise correlation computation //BEGIN----Gird-Hash correlation computation if[flagHash grid:(_ P^R)#0 index:0;do[n_stream;Report[index];index+:1] gridhashTime:_t-start2 ] //END----Gird-Hash correlation computation ] h+:1 ] //Report time and output ("Time required:"),($_t-start),(" seconds for "),($m*bw*n_stream)," records." /("Time required:"),($(_t-start2)%(m-nb)*bw)," seconds per time point." outfile1 0: outs1 outfile2 0: outs2 / TIME TESTING HARNESS (REMOVE / if you want this) /.time.sum[] /xf: .TIME.f /xt: .TIME.t /xx: xf,'xt /zz: xx @