Esper学习之十EPL语法(六)_第1页
Esper学习之十EPL语法(六)_第2页
Esper学习之十EPL语法(六)_第3页
Esper学习之十EPL语法(六)_第4页
Esper学习之十EPL语法(六)_第5页
已阅读5页,还剩9页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

2014是新的一年,正好也是本人的本命年。既然是本命年,看来今年也是本人兴旺之年了。。。开了个小玩笑,同时也祝各位同行今年少调bug多涨工资,这才是最实际的。

年前的最后一篇说的是子查询和join,基本上epl的大部分简单语法都说完了。之前有朋友问我epl怎么和数据库交互,正好今天这篇就是来专门解释这个问题。但是要提醒各位,本篇只是说明了在epl中如何与数据库交互,并且只能算是简单的交互。而高级的用法会在esperio里有详细的指导(esperio的文档可在esper的官网找到)。

在esper的文档中,epl访问数据库的配置放在了比较靠后的位置,不过为了方便各位学习,这里会先说明和数据库交互的相关配置,然后再说epl怎么访问数据库。配置文件在官方esper包的etc文件夹下,大家可以参考着学习。

1.连接数据库a.JNDI获取连接配置如下:[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb1">

<datasource-connection

context-lookup-name="java:comp/env/jdbc/mydb">

<env-property

name="java.naming.factory.initial"

value

="com.myclass.CtxFactory"/>

<env-property

name="vider.url"

value

="iiop://localhost:1050"/

>

</datasource-connection>

</database-reference>

database-reference的name是要连接的数据库名字,其余的配置可参考JNDI的文档使用方法:[java]\o"viewplain"viewplain\o"copy"copyif

(envProperties.size()

>

0)

{

initialContext

=

new

InitialContext(envProperties);

}

else

{

initialContext

=

new

InitialContext();

}

DataSource

dataSource

=

(DataSource)

initialContext.lookup(lookupName);

Connection

connection

=

dataSource.getConnection();

更多内容可参考JNDI的文档b.从连接池获取连接配置如下:(以dbcp为例)[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb3">

<!--

For

a

complete

list

of

properties

see

Apache

DBCP.

-->

<datasourcefactory-connection

class-name="mons.dbcp.BasicDataSourceFactory">

<env-property

name="username"

value

="myusername"/>

<env-property

name="password"

value

="mypassword"/>

<env-property

name="driverClassName"

value

="com.mysql.jdbc.Driver"/>

<env-property

name="url"

value

="jdbc:mysql://localhost/test"/>

<env-property

name="initialSize"

value

="2"/>

<env-property

name="validationQuery"

value

="select

1

from

dual"/>

</datasourcefactory-connection>

<connection-lifecycle

value="pooled"/>

</database-reference>相同的配置可以使用esper的api达到同样的效果。代码如下:[java]\o"viewplain"viewplain\o"copy"copyProperties

props

=

new

Properties();

props.put("username",

"myusername");

props.put("password",

"mypassword");

props.put("driverClassName",

"com.mysql.jdbc.Driver");

props.put("url",

"jdbc:mysql://localhost/test");

props.put("initialSize",

2);

props.put("validationQuery",

"select

1

from

dual");

ConfigurationDBRef

configDB

=

new

ConfigurationDBRef();

//

BasicDataSourceFactory

is

an

Apache

DBCP

import

configDB.setDataSourceFactory(props,

BasicDataSourceFactory.class.getName());

configDB.setConnectionLifecycleEnum(ConfigurationDBRef.ConnectionLifecycleEnum.POOLED);

Configuration

configuration

=

new

Configuration();

configuration.addDatabaseReference("mydb3",

configDB);

同样,也可以自己实现数据源。示例如下:[java]\o"viewplain"viewplain\o"copy"copyconfigDB.setDataSourceFactory(props,

MyOwnDataSourceFactory.class.getName());

...

class

MyOwnDataSourceFactory

{

public

static

DataSource

createDataSource(Properties

properties)

{

return

new

MyDataSourceImpl(properties);

}

}

c.JDBC获取连接前提是要将对应的jdbc驱动假如classpath[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb2">

<drivermanager-connection

class-name="com.mysql.jdbc.Driver"

url="jdbc:mysql://localhost:3306/mydb2"

user="myuser"

password="mypassword">

<connection-arg

name="user"

value

="myuser"/>

<connection-arg

name="password"

value

="mypassword"/>

<connection-arg

name="somearg"

value

="someargvalue"/>

</drivermanager-connection>

</database-reference>

注意:drivermanager-connection中的user和password属性必须填写,即使增加了connection-arg参数也不行。所以实际上connection-arg的user和password是不需要写的。这点我觉得esper做的不够人性化。d.其他关于数据库连接的配置下面是一些和数据库交互的配置,更多配置可参考Javadoc[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb2">

...

configure

data

source

or

driver

manager

settings...

<connection-settings

auto-commit="true"

catalog="mycatalog"

read-only="true"

transaction-isolation="1"

/>

</database-reference>

下面是关于连接的生命周期的配置[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb2">

...

configure

data

source

or

driver

manager

settings...

<connection-lifecycle

value="pooled"/><!--

retain

-->

</database-reference>

如果参数值为pooled,当配置了连接池,则会将每次获取的连接还给连接池。若没配置连接池,则每次获取的连接用完后就关闭。如果参数值为retain,则会将连接缓存到esper引擎中,这个epl用完后,另一个epl可以接着用2.查询结果缓存策略

EPL和数据库交互时会产生查询结果,所以引擎若能缓存查询结果将大大提高执行效率,因此esper提供了两种缓存模式。

a.LRUCache

LRU即least-recently-used,中文释义为“最近最少使用”,学过OS的应该知道内存缓存策略里也有这个算法,不明白的请自行搜索。配置如下:[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb">

...

configure

data

source

or

driver

manager

settings...

<lru-cache

size="1000"/>

</database-reference>

size的参数值表示最多能缓存多少条查询结果,而不是大小

b.Expiry-timeCache

该策略为每一次的查询结果设置了存活期(注意不是每一条查询结果),并且定期清理过期的查询结果。配置如下:[html]\o"viewplain"viewplain\o"copy"copy<database-reference

name="mydb">

...

configure

data

source

or

driver

manager

settings...

<expiry-time-cache

max-age-seconds="60"

purge-interval-seconds="120"

ref-type="soft"/>

</database-reference>

max-age-seconds表示存活时间,purge-interval-seconds表示每隔多久清理一次过期的内容,两者单位都是秒。

ref-type有三个参数值:weak,soft,hard。表示查询结果的引用级别,JVM垃圾回收的时候,会根据此参数决定何时释放缓存。具体解释如下:

1).weak表示弱引用,JVM在垃圾回收的时候将清除所有的缓存,释放内存。

2).soft表示软引用,JVM在垃圾回收的时候,只有当所有的弱引用都被回收了才会清除所有的缓存并释放空间。

3).hard表示强引用,JVM的垃圾回收不会清除缓存,所以引擎将按照规定的存活期和清理时间管理缓存。

3.ColumnChangeCase

通常情况下,表字段是大小写不敏感的,但是也有设置为小写敏感的情况,我们可以通过设置使得查询返回的列结果为大写或者小写。配置如下:[html]\o"viewplain"viewplain\o"copy"copy<column-change-case

value="lowercase"/>

4.SQLTypesMapping

默认的数据库字段类型映射可以满足需求,不过想修改也是可以的。配置如下:[html]\o"viewplain"viewplain\o"copy"copy<sql-types-mapping

sql-type="2"

java-type="int"

/>

sql-type表示数据库字段类型,这里的2映射了具体类型,可在java.sql.Types类中查到,并且这个类里包含了大部分的数据库字段类型。java-type表示对应的java数据类型,大小写不敏感。

以上就是EPL和数据库交互的相关配置,下面来讲解EPL是怎么和数据库交互的。

EPL和数据库交互有两个前提,一是JDBC驱动能够预编译sql,而是JDBC驱动能获取数据库的元数据。

5.JoiningSQLQueryResults

通常我们想要的一种交互方式是:输入某个事件到引擎,然后引擎把事件的某个属性作为sql的查询条件交给JDBC驱动,执行sql。正好esper为此提供了相应的解决办法,参看语法:[plain]\o"viewplain"viewplain\o"copy"copysql:database_name

["

parameterized_sql_query

"]

sql是关键字不可少,parameterized_sql_query为sql语句,只与具体的DB有关,无关esper,所以数据库的那些个函数都可以用。先看一个简单的例子:[plain]\o"viewplain"viewplain\o"copy"copyselect

custId,

cust_name

from

CustomerCallEvent,

sql:MyCustomerDB

['

select

cust_name

from

Customer

where

cust_id

=

${custId}

']

引擎接收CustomerCallEvent事件,将事件的custId属性作为查询值,执行MyCustomerDB数据库的Customer表,其中查询条件为Customer的cust_id字段值存在,然后返回相应的custId属性值和cust_name字段值给监听器

该语法有几点需要注意:

a.sql需要用单引号或者双引号引起来,然后再用方括号括起来。

b.${expression}中可以是事件属性,可以是变量、常量等,也可以是用户自定义的函数。例如:[plain]\o"viewplain"viewplain\o"copy"copyselect

*

from

LimitEvent

le,

sql:MyCustomerDB

['

select

cust_name

from

Customer

where

amount

>

${max(varLowerLimit,

MyLib.getLimit(le))}

']

c.join的事件可以使用view,但是sql不可使用。不过可以将sql的查询结果通过insertinto输出到另外的事件,然后再使用view。例如:[plain]\o"viewplain"viewplain\o"copy"copyselect

customerId,

customerName

from

CustomerCallEvent.win:time(30

sec)

as

cce,

sql:MyCustomerDB

["select

cust_id

as

customerId,

cust_name

as

customerName

from

Customer

where

cust_id

=

${cce.custId}"]

as

cq

d.可以用as为表的字段设置别名,例如:[plain]\o"viewplain"viewplain\o"copy"copyselect

custId,

custName

from

CustomerCallEvent,

sql:MyCustomerDB

['

select

cust_name

as

custName

from

Customer

where

cust_id

=

${custId}

']

e.当使用事件的属性作为查询值是,属性名不要和字段名重名,否则会报错,esper无法识别

f.join的sql语句没有限制,并且可以使用where子句。例如:[plain]\o"viewplain"viewplain\o"copy"copyselect

symbol,

symbolDesc

from

OrderEvent

as

orders,

sql:My_Oracle_DB

['select

symbolDesc

from

SymbolReference']

as

reference,

sql:My_MySQL_DB

['select

orderList

from

orderHistory']

as

history

where

reference.symbol

=

orders.symbol

and

history.symbol

=

orders.symbol

除了普通的join,EPL也支持outerjoinsql语句,语法也没有什么改变。例如:[plain]\o"viewplain"viewplain\o"copy"copyselect

custId,

custName

from

CustomerCallEvent

as

cce

left

outer

join

sql:MyCustomerDB

["select

cust_id,

cust_name

as

custName

from

Customer

where

cust_id

=

${cce.custId}"]

as

cq

on

cce.custId

=

cq.cust_id

6.UsingPatternstoRequestData

除了通过传递外部数据查询数据库,也可以用pattern定时或者以固定频率查询数据库。例如:[plain]\o"viewplain"viewplain\o"copy"copyinsert

into

NewOrders

select

orderId,

orderAmount

from

pattern

[every

timer:interval(5

sec)],

sql:MyCustomerDB

['select

orderId,

orderAmount

from

NewOrders']

pattern语法之后再说,这里只让大家知道有这么一个用法。7.PollingSQLQueriesviaAPI

Esper提供了API直接执行EPL来达到访问数据库的目的。请看下面的代码:[java]\o"viewplain"viewplain\o"copy"copypackage

example;

import

com.espertech.esper.client.Configuration;

import

com.espertech.esper.client.EPAdministrator;

import

com.espertech.esper.client.EPRuntime;

import

com.espertech.esper.client.EPServiceProvider;

import

com.espertech.esper.client.EPServiceProviderManager;

import

com.espertech.esper.client.EPStatement;

import

com.espertech.esper.client.EventBean;

import

java.util.Iterator;

/**

*

Created

by

Luonanqin

on

4/17/14.

*/

public

class

IteratorSQLTest

{

public

static

void

main(String[]

args)

throws

InterruptedException

{

Configuration

config

=

new

Configuration();

config.configure("esper.examples.cfg.xml");

config.addVariable("vari",

Integer.class,

1);

EPServiceProvider

epService

=

EPServiceProviderManager.getDefaultProvider(config);

EPAdministrator

admin

=

epService.getEPAdministrator();

EPRuntime

runtime

=

epService.getEPRuntime();

//

id=1,

name="luonq"

String

epl1

=

"select

id,

name

from

sql:test['select

id,

name

from

test1

where

id=${vari}']";

EPStatement

state

=

admin.createEPL(epl1);

Iterator<EventBean>

iter

=

state.iterator();

//

也可以调用safeIterator方法,该方法以线程安全方式查询DB

while

(iter.hasNext())

{

EventBean

eventBean

=

iter.next();

System.out.println(eventBean.get("id")

+

"

"

+

eventBean.get("name"));

}

}

}

执行结果:[plain]\o"viewplain"viewplain\o"copy"copy1

luonq

8.SQLInputParameterandColumnOutputConversion

刚才数据库配置里面有说到可以修改数据库字段类型和java数据类型的映射关系,但是那只是针对全局的设置,如果想针对EPL来设置映射关系,可以实现SQLColumnTypeConversion接口,然后通过注解Hook调用实现类。具体代码及解释如下:[java]\o"viewplain"viewplain\o"copy"copyimport

com.espertech.esper.client.hook.SQLColumnTypeContext;

import

com.espertech.esper.client.hook.SQLColumnTypeConversion;

import

com.espertech.esper.client.hook.SQLColumnValueContext;

import

com.espertech.esper.client.hook.SQLInputParameterContext;

/**

*

*

MySQLColumnTypeConvertor必须为public类,不然无法实例化。

*

Esper会为每一个EPL实例,即EPStatement提供一个Convertor实例

*

*

该例子没有做任何转换。

*

Created

by

Luonanqin

on

2/9/14.

*/

public

class

MySQLColumnTypeConvertor

implements

SQLColumnTypeConversion{

//

转换列的类型

public

Class

getColumnType(SQLColumnTypeContext

context)

{

Class

clazz

=

context.getColumnClassType();

return

clazz;

}

//

转换列的值

public

Object

getColumnValue(SQLColumnValueContext

context)

{

Object

obj

=

context.getColumnValue();

return

obj;

}

//

转换传入的参数值

public

Object

getParameterValue(SQLInputParameterContext

context)

{

Object

obj

=

context.getParameterValue();

return

obj;

}

}

package

example;

import

com.espertech.esper.client.Configuration;

import

com.espertech.esper.client.EPAdministrator;

import

com.espertech.esper.client.EPServiceProvider;

import

com.espertech.esper.client.EPServiceProviderManager;

import

com.espertech.esper.client.EPStatement;

import

com.espertech.esper.client.EventBean;

import

java.util.Iterator;

/**

*

MySQLColumnTypeConvertor必须为public类,不然无法实例化。

Esper会为每一个EPL提供一个Convertor实例

*

*

Created

by

Luonanqin

on

2/9/14.

*/

public

class

SQLColumnTypeConversionTest

{

public

static

void

main(String[]

args)

throws

InterruptedException

{

Configuration

config

=

new

Configuration();

config.configure("esper.examples.cfg.xml");

config.addVariable("vari",

Integer.class,

1);

EPServiceProvider

epService

=

EPServiceProviderManager.getDefaultProvider(config);

EPAdministrator

admin

=

epService.getEPAdministrator();

//

id=1,

name="luonq"

String

epl1

=

"@Hook(type=HookType.SQLCOL,

hook='"

+

MySQLColumnTypeConvertor.class.getName()

+

"')select

id,

name

from

sql:test['select

id,

name

from

test1

where

id=${vari}']";

System.out.println(epl1);

EPStatement

state1

=

admin.createEPL(epl1);

Iterator<EventBean>

iter

=

state1.iterator();

while

(iter.hasNext())

{

EventBean

eventBean

=

iter.next();

System.out.println(eventBean.get("id")

+

"

"

+

eventBean.get("name"));

}

}

}

执行结果:[plain]\o"viewplain"viewplain\o"copy"copy@Hook(type=HookType.SQLCOL,

hook='example.MySQLColumnTypeConvertor')select

id,

name

from

sql:test['select

id,

name

from

test1

where

id=${vari}']

1

luonq

9.SQLRowPOJOConversion

刚才说的列类型的转换以及列结果的转换,只是普通的转换。Esper还支持表的查询结果按行转换,比如说转换为POJO,而不像之前那样只能针对每一个字段结果单独进行转换。用法也是通过Hook注解来调用转换类。代码如下:[java]\o"viewplain"viewplain\o"copy"copyimport

java.sql.ResultSet;

import

java.sql.SQLException;

import

com.espertech.esper.client.hook.SQLOutputRowConversion;

import

com.espertech.esper.client.hook.SQLOutputRowTypeContext;

import

com.espertech.esper.client.hook.SQLOutputRowValueContext;

/**

*

Created

by

Luonanqin

on

2/10/14.

*/

public

class

MySQLOutputRowConvertor

implements

SQLOutputRowConversion

{

//

每行查询结果转换后的类型

public

Class

getOutputRowType(SQLOutputRowTypeContext

context)

{

return

String.class;

}

//

返回转换后的内容

public

Object

getOutputRow(SQLOutputRowValueContext

context)

{

ResultSet

result

=

context.getResultSet();

Object

obj1

=

null;

Object

obj2

=

null;

try

{

obj1

=

result.getObject("id");

obj2

=

result.getObject("name");

}

catch

(SQLException

e)

{

e.printStackTrace();

}

return

obj1

+

"

and

"

+

obj2;

}

}

package

example;

import

com.espertech.esper.client.Configuration;

import

com.espertech.esper.client.EPAdministrator;

import

com.espertech.esper.client.EPServiceProvider;

import

com.espertech.esper.client.EPSer

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论