在 Logstash 的过滤器中。有许多的过滤器是可以用来和外部的数据对 pipeline 里的数据进行丰富的。Jdbc_static 过滤器就是其中的一个。此过滤器使用从远程数据库预加载的数据丰富事件。我们之所以选择这个,是因为你不必频繁查询你的数据库,也不会给你的数据库带来很大的压力。
此过滤器最适合使用静态或不经常更改的参考数据(例如环境、用户和产品)来丰富事件。
此过滤器的工作方式是从远程数据库获取数据,将其缓存在本地内存中的 Apache Derby 数据库中,并使用查找来使用本地数据库中缓存的数据来丰富事件。 你可以将过滤器设置为一次加载远程数据(对于静态数据),或者你可以安排远程加载定期运行(对于需要刷新的数据)。
在本篇文章中,我们将详细地介绍如何使用这个过滤器来丰富数据。首先,我们可以参考我之前的另外一篇文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据” 来设置如何访问 MySQL 数据以及在 Jdbc stack 过滤器中如何配置访问 MySQL。
安装 MySQL如果你还没有安装及配置好你的 MySQL,请参照我之前的文章 “Logstash:把 MySQL 数据导入到 Elasticsearch 中” 来进行安装及配置好自己的 MySQL 数据库。记得把相应的 Connector 放入相应的文件目录中。
安装好我们的 MySQL 后,我们创建一个叫做 data 的数据库,并创建一个叫做 sensors 及一个叫做 users 的表格:
上面的两个表格非常简单。我们可以使用 MySQL 命令查看它们的内容:
mysql> describe sensors; +-----------------+---------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-----------------+---------------+------+-----+---------+-------+ | customer | varchar(255) | YES | | NULL | | | room | varchar(255) | YES | | NULL | | | floor | varchar(255) | YES | | NULL | | | department | varchar(255) | YES | | NULL | | | locationOnFloor | varchar(255) | YES | | NULL | | | sensorType | varchar(255) | YES | | NULL | | | buildingName | varchar(255) | YES | | NULL | | | longitude | double(255,0) | YES | | NULL | | | latitude | double(255,0) | YES | | NULL | | | id | int(255) | NO | PRI | NULL | | +-----------------+---------------+------+-----+---------+-------+ 10 rows in set (0.00 sec) mysql> select * from sensors; +----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+ | customer | room | floor | department | locationOnFloor | sensorType | buildingName | longitude | latitude | id | +----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+ | Elastic | 101 | Floor 1 | Engineering | Desk 102 | Temperature | 222 Broadway | -74 | 41 | 1 | +----------+------+---------+-------------+-----------------+-------------+--------------+-----------+----------+----+ 1 row in set (0.00 sec)
mysql> describe users; +-----------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-----------+--------------+------+-----+---------+----------------+ | userid | int(11) | NO | PRI | NULL | auto_increment | | firstname | varchar(255) | YES | | NULL | | | lastname | varchar(255) | YES | | NULL | | +-----------+--------------+------+-----+---------+----------------+ 3 rows in set (0.00 sec) mysql> select * from users; +--------+-----------+----------+ | userid | firstname | lastname | +--------+-----------+----------+ | 2 | xiaoguo | liu | +--------+-----------+----------+ 1 row in set (0.00 sec)
这样,我们就完成了 MySQL 的部署。
使用 Jdbc 过滤器来丰富数据接下来,我们想使用 sensors 表格中的 id 以及 users 表格中的 userid 来对我们的事件进行丰富。我们来创建如下的一个 Logstash 的配置文件:
logstash.conf
input { generator { message => "id=1&userid=2" count => 1 } } filter { kv { source => "message" field_split => "&?" } mutate { convert => { "id" => "integer" "userid" => "integer" } } jdbc_static { loaders => [ { id => "remote-sensors" query => "select id, customer, room, sensorType from sensors" local_table => "sensors" }, { id => "remote-users" query => "select firstname, lastname, userid from users order by userid" local_table => "users" } ] local_db_objects => [ { name => "sensors" index_columns => ["id"] columns => [ ["id", "int"], ["customer", "varchar(255)"], ["room", "varchar(255)"], ["sensorType", "varchar(255)"] ] }, { name => "users" index_columns => ["userid"] columns => [ ["firstname", "varchar(255)"], ["lastname", "varchar(255)"], ["userid", "int"] ] } ] local_lookups => [ { id => "local-servers" query => "select customer, room, sensorType from sensors WHERe id = :id" parameters => {id => "[id]"} target => "server" }, { id => "local-users" query => "select firstname, lastname from users WHERe userid = :userid" parameters => {userid => "[userid]"} target => "user" } ] # using add_field here to add & rename values to the event root add_field => { sensor_name => "%{[server][0][customer]} %{[server][0][room]} %{[server][0][sensortype]}" } add_field => { customer_name => "%{[user][0][firstname]} %{[user][0][lastname]}" } # remove_field => ["server", "user"] jdbc_user => "root" jdbc_password => "1234" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # jdbc_driver_library => "" jdbc_connection_string => "jdbc:mysql://localhost:3306/data?useTimezone=true&&serverTimezone=UTC" } } output { stdout { codec => rubydebug } }
在上面,我们使用 generator 来创建一个事件。我们在 filter 的部分使用 kv 过滤器来对它进行解析。由于 id 和 userid 为字符串。它们和数据库表格中的字段属性是不一样的。我们使用 mutate 过滤器来对它们的类型进行转换。接下来,我们使用 jdbc_static 过滤器来对这个事件进行丰富。
我们通过如下的命令来启动 Logstash:
./bin/logstash -f logstash.conf
我们可以在 terminal 中看到如下的结果:
从上面,我们可以看出来有两个新增加的字段 sensor_name 以及 customer_name。这两个字段是从 MySQL 中的表格中进行丰富而来。更多关于 Jdbc static 过滤器的介绍请参阅官方文档。