Codemountain, Paulo Suzart's Blog

CEP com Scala + Drools Fusion

with 3 comments

Motivado pelo post de um grande “broder”, o Daniel Amadei, resolvi mostrar uma alternativa open source para o mostrado no seu post: Jb**s Drools Fusion.

Tive a oportunidade de trabalhar com o Drools como engine de promoções em um grande e-commerce brasileiro em 2008, e agora com o aumento do interesse por EDA (Event Driven Architecture), apresento um overview do Drools Fusion que promete trazer o conceito de processamento complexo de eventos sem demandar o aprendizado de uma nova linguagem ou ferramenta. Desenvolvedores já familiarizados com o Drools core podem começar imediatamente a construção de lógicas baseadas em eventos.

O Drools Fusion pode ser aplicado nos cenários clássicos de CEP como detecção de fraude, mercado financeiro e todo caso onde é necessário analisar um grande volume de eventos e obter um significado a partir deles.

Para exemplificar o uso do Drools Fusion, saí do ambiente corporativo e parti pra algo mais industrial. Imagine uma caldeira numa fábrica de alimentos.  Esta caldeira possui uma restrição de operação onde a tempreratura interna não pode sofrer as seguintes variações:

  • cenário 1: a caldeira não pode operar próxima do limite máximo de temperatura (acima dos 135 graus celsius) por mais de 20 segundos.
  • cenário 2: Dentro de 5 segundos, a variação de temperatura não deve passar de 10 graus celsius pra mais ou para menos. Caso contrário o aparelho corre o risco de explosão. Exemplo: se a caldeira opera a 90 graus e em 5 segundos a temperatura atingir 111 graus, o risco de explosão é eminente.

Em ambos os casos a medida de tempo está presente além da variável temperatura.  Para concluir o caso, suponha um sensor enviando os dados de temperatura para o nosso sistema.

Logo, Traduzindo para o Drools temos a o evento:

import cep.fusion.CEP.TemperatureChange //we need to import it
declare TemperatureChange
 @role(event) //set the role as an event
 @timestamp(time) //indicate the timestamp attribute in the event
end

Que provem da classe Scala:

case class TemperatureChange(@BeanProperty temperature : Double, @BeanProperty time : Date)

Cada variação na temperatura é um novo evento que guarda o novo valor. TemperatureChange possui um atributo – temperature – acessível por métodos get/set no Drools por conta da anotação @BeanProperty. O outro atributo é o tempo em que a mudança ocorre, fator importante para o disparo correto das regras baseadas em Sliding Windows e Temporal Reasoning. Se não informado um atributo como timestamp do evento, o momento de entrada do evento na memória do Drools Fusion é considerado seu timestamp. Eu experimentei problemas durante os testes por não ter escolhido e explicitado um timestamp para meus eventos.

Para atender o cenário 1, vamos criar a regra de Sliding Windows:

rule "Inside the limits for 20s"

 when
 $n : Number(doubleValue >= 135 && doubleValue <= 145) from
      accumulate (
          TemperatureChange($t: temperature)  over window:time(20s) from entry-point "temp-change",
               average($t)
                 )
 then
     System.out.println("About to Blow Up!!!!! Morhe than 20 seconds inside the limits. Average: " + $n);
end

Note a clausula from entry-point na regra. Ela é utilizada para que o Fusion leia constantemente os eventos do fluxo (Stream) de nome temp-change.

Um evento é inserido diretamente em um dado Stream quando fazemos o seguinte no nosso código Scala:

   val clock  = session.getSessionClock.asInstanceOf[SessionPseudoClock]
   val stream = session.getWorkingMemoryEntryPoint("temp-change")
   stream.insert(TemperatureChange(78))
   clock.advanceTime(5, TimeUnit.SECONDS)

Onde “temp-change” é o nome do Stream de eventos capturado na nossa regra.

Para facilitar nosso teste, o Drools Fusion fornece um “clock controlado”. Assim podemos inserir eventos na sessão e manter o controle da passagem do tempo como queiramos. Para o exemplo usei o PseudoClock e a cada inserção avanço o relógio em x segundos. Não podemos esquecer de avançar o timestamp de entrada dos eventos. Por isso, no código de exemplo criei uma função scala para avançar ambos ao mesmo tempo (função advanceSeconds).

Para o cenário 2 temos a regra considerando tempo entre eventos:


rule "High Variance"

 when
   $a : TemperatureChange(t: temperature) from entry-point "temp-change"
   $b : TemperatureChange(t2: temperature, ( eval ((t - temperature) >= 10) || eval ((temperature - t) >= 10)), this after[0s, 5s] $a) from entry-point "temp-change"
 then
   System.out.println("High Temperature Variance!!!!!");
end

A regra diz que, na ocorrência de uma mudança de temperatura, um nova ocorrência dentro dos limites de 0 a 5 segundos, acionará esta regra caso a variação seja maior que 10 graus. Isto é possível com o uso do this afert[0s, 5s] na regra.

Pronto, agora podemos testar nossas regras:


package cep.fusion;

import cep.fusion.CEP._
import CEP.MyTimer._
//other imports...

object CEPTest {
 def suite: Test = {
 val suite = new TestSuite(classOf[CEPTest]);
 suite
 }

 def main(args : Array[String]) {
 junit.textui.TestRunner.run(suite);
 }
}

/**
 * Unit test for CEP.
 */
class CEPTest extends TestCase("CEP") {

 implicit val (session, stream, clock) = start

 /**
 * Case 1 must fire exactly fire two rules
 */
 def testCase1 = {

 stream.insert(TemperatureChange(135, advanceSeconds(1)))
 stream.insert(TemperatureChange(140, advanceSeconds(3)))
 stream.insert(TemperatureChange(143, advanceSeconds(4)))
 assertEquals(1, session.fireAllRules)

 }

 /**
 * Case 1 must fire exactly fire one rule
 */
 def testCase2 = {

 stream.insert(TemperatureChange(120, advanceSeconds(2)))
 stream.insert(TemperatureChange(131, advanceSeconds(1)))
 assertEquals(1, session.fireAllRules)
 }

}

//run using mvn test

O código da aplicação que usa Scala em um projeto Maven com o Maven Scala Plugin pode ser baixado direto no meu github. Não explorei no post as configurações do Drools para não perder o foco, mais detalhes você encontra no código. depois de baixar o projeto, basta executar mvn test para ver os resultados.

CEP sem dúvida é algo cada vez mais próximo de se tornar mandatório em ambientes dinâmicos, onde ações preventivas ou corretivas podem ser disparadas através da detecção de padrões em inúmeros fluxos de eventos no momento em que eles ocorrem.

Para gerar um projeto no eclipse basta executar: mvn eclipse:eclipse😉

Bons estudos!

Written by paulosuzart

maio 11, 2010 às 3:31 pm

Publicado em cep, eda, scala

Tagged with , , ,

3 Respostas

Subscribe to comments with RSS.

  1. No fim, o comando para gerar o projeto para Eclipse não seria: “mvn eclipse:eclipse”? Acho que inverteu as letras ali. (;

    Leonardo Saraiva

    maio 11, 2010 at 6:45 pm

  2. Mto legal msm, so falta o bicho comecar a aprender e ajustar as regras dinamicamente rs

    David Souza

    maio 12, 2010 at 2:05 pm


Deixe uma resposta

Preencha os seus dados abaixo ou clique em um ícone para log in:

Logotipo do WordPress.com

Você está comentando utilizando sua conta WordPress.com. Sair / Alterar )

Imagem do Twitter

Você está comentando utilizando sua conta Twitter. Sair / Alterar )

Foto do Facebook

Você está comentando utilizando sua conta Facebook. Sair / Alterar )

Foto do Google+

Você está comentando utilizando sua conta Google+. Sair / Alterar )

Conectando a %s

%d blogueiros gostam disto: