ELK
1、核心组成
ELK 是一个应用套件,由 Elasticsearch、Logstash 和 Kibana 三部分组件组成,简称 ELK;它是一套开源免费、功能强大的日志分析管理系统。ELK 可以将我们的系统日志、网站日志、应用系统日志等各种日志进行收集、过滤、清洗,然后进行集中存放并可用于实时检索、分析。 这三款软件都是开源软件,通常是配合使用,而且又先后归于 Elastic.co 公司名下,故又被简称为 ELK Stack。下图是 ELK Stack 的基础组成。
2、Elasticsearch 介绍
Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析,采用 Java语言编写。它的主要特点如下:
- 实时搜索,实时分析
- 分布式架构、实时文件存储,并将每一个字段都编入索引
- 文档导向,所有的对象全部是文档
- 高可用性,易扩展,支持集群(Cluster)、分片和复制(Shards和Replicas)
- 接口友好,支持 JSON
Elasticsearch 支持集群架构,典型的集群架构如下图所示:
从图中可以看出,Elasticsearch 集群中有 Master Node 和 Slave Node 两种角色,其实还有一种角色 Client Node,这在后面会做深入介绍。
3、Logstash 介绍
Logstash 是一款轻量级的、开源的日收集处理框架,它可以方便的把分散的、多样化的日志搜集起来,并进行自定义过滤分析处理,然后传输到指定的位置,比如某个服务器或者文件。Logstash 采用 JRuby 语言编写。它的主要特点如下:
- input:数据收集
- filter:数据加工,如过滤,改写等
- output:数据输出
别看它只做三件事,但通过组合输入和输出,可以变幻出多种架构实现多种需求。Logstash 内部运行逻辑如下图所示:
其中,每个部分含义如下:
- Shipper:主要用来收集日志数据,负责监控本地日志文件的变化,及时把日志文件的最新内容收集起来,然后经过加工、过滤,输出到 Broker。
- Broker:相当于日志 HUb,用来连接多个 Shipper 和多个 Indexer。
- Indexer:从 Broker 读取文本,经过加工、过滤,输出到指定的介质(可以是文件、网络、elasticsearch 等)中。
Redis 服务器是 Logstash 官方推荐的 Broker,这个 Broker 起数据缓存的作用,通过这个缓存器可以提高 Logstash Shipper 发送日志到 Logstash Indexer 的速度,同时避免由于突然断电等导致的数据丢失。可以实现 Broker 功能的还有很多软件,例如 Kafka 等。
这里需要说明的是,在实际应用中,LogStash 自身并没有什么角色,只是根据不同的功能、不同的配置给出不同的称呼而已,无论是 Shipper 还是 Indexer,始终只做前面提到的三件事。
这里需要重点掌握的是 Logstash 中 Shipper 和 Indexer 的作用,因这两个部分是 Logstash 功能的核心,在下面的介绍中,会陆续介绍到这两个部分实现的功能细节。
4、Kibana 介绍
Kibano 是一个开源的数据分析可视化平台。使用 Kibana 可以为 Logstash 和 ElasticSearch 提供的日志数据进行高效的搜索、可视化汇总和多维度分析,还可以与 Elasticsearch 搜索引擎之中的数据进行交互。它基于浏览器的界面操作可以快速创建动态仪表板,实时监控 ElasticSearch 的数据状态与更改。
5、ELK 工作流程
一般都是在需要收集日志的所有服务上部署 Logstash,作为 Logstash Shipper 用于监控并收集、过滤日志,接着,将过滤后的日志发送给 Broker,然后,Logstash Indexer 将存放在 Broker 中的数据再写入 Elasticsearch,Elasticsearch 对这些数据创建索引,最后由 Kibana 对其进行各种分析并以图表的形式展示。
有些时候,如果收集的日志量较大,为了保证日志收集的性能和数据的完整性,Logstash Shipper 和Logstash Indexer 之间的缓冲器(Broker)也经常采用 Kafka 来实现。 在这个图中,要重点掌握的是 ELK 架构的数据流向,以及 Logstash、Elasticsearch 和 Kibana 组合实现的功能细节。
6、ELK 常见应用架构
6.1 简单的 ELK 应用架构
此架构主要是将 Logstash 部署在各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的 Elasticsearch 进行存储。Elasticsearch 再将数据以分片的形式压缩存储,并提供多种 API 供用户查询、操作。用户可以通过 Kibana Web 直观的对日志进行查询,并根据需求生成数据报表。
此架构的优点是搭建简单,易于上手。缺点是 Logstash 消耗系统资源比较大,运行时占用 CPU 和内存资源较高。另外,由于没有消息队列缓存,可能存在数据丢失的风险。此架构建议供初学者或数据量小的环境使用。
6.2 典型 ELK 架构
此架构主要特点是引入了消息队列机制,位于各个节点上的 Logstash Agent(一级 Logstash,主要用来传输数据)先将数据传递给消息队列(常见的有 Kafka、Redis 等),接着,Logstash Server(二级 Logstash,主要用来拉取消息队列数据,过滤并分析数据)将格式化的数据传递给 Elasticsearch 进行存储。最后,由 Kibana 将日志和数据呈现给用户。由于引入了Kafka(或者 Redis)缓存机制,即使远端 Logstash Server 因故障停止运行,数据也不会丢失,因为数据已经被存储下来了。
这种架构适用于较大集群、数据量一般的应用环境,但由于二级 Logstash 要分析处理大量数据,同时 Elasticsearch 也要存储和索引大量数据,因此它们的负荷会比较重,解决的方法是将它们配置为集群模式,以分担负载。
此架构的优点在于引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性,但依然存在 Logstash 占用系统资源过多的问题,在海量数据应用场景下,可能会出现性能瓶颈。
6.3 ELK 集群架构
这个架构是在上面第二个架构基础上改进而来的,主要是将前端收集数据的 Logstash Agent 换成了Filebeat,消息队列使用了 Kafka 集群,然后将 Logstash 和 Elasticsearch 都通过集群模式进行构建,此架构适合大型集群、海量数据的业务场景,它通过将前端 Logstash Agent 替换成 Filebeat,有效降低了收集日志对业务系统资源的消耗。同时,消息队列使用 Kafka 集群架构,有效保障了收集数据的安全性和稳定性,而后端 Logstash 和 Elasticsearch 均采用集群模式搭建,从整体上提高了 ELK 系统的高效性、扩展性和吞吐量。
下面我们就以此架构为主介绍如何安装、配置、构建和使用 ELK 大数据日志分析系统。
7、ELK + Filebeat + Kafka + ZooKeeper 构建大数据日志分析平台
7.1 ELK 应用案例
7.2 环境与角色说明
7.2.1 服务器环境与角色
虚拟机统一由 multipass 安装,选用 Ubuntu 24.04 版本,各个服务器角色如下表所示:
IP地址 | 主机名 | 角色 | 所属集群 |
---|---|---|---|
192.168.64.21 | filebeat | 业务服务器 + Filebeat | 业务服务器集群 |
192.168.64.22 | kafka1 | Kafka + Zookeeper | Kafka Broker 集群 |
192.168.64.23 | kafka2 | Kafka + Zookeeper | |
192.168.64.24 | kafka3 | Kafka + Zookeeper | |
192.168.64.25 | logstash | Logstash | 数据转发 |
192.168.64.26 | elasticsearch1 | ES Master + Kibana | Elasticsearch 集群 |
192.168.64.27 | elasticsearch2 | ES Master + ES DataNode | |
192.168.64.28 | elasticsearch3 | ES Master + ES DataNode |
7.2.2 软件环境与版本
下表详细说明了安装软件对应的名称和版本号,其中,ELK 三款软件版本必须相同,这里选择的是 7.17.25 版本。Elasticsearch 7.7 之后才支持 aarch64 架构,Kibana 7.9 之后才支持 aarch64 架构,Logstash 7.10 之后才支持 aarch64 架构,Filebeat 7.12 之后才支持 aarch64 架构。
软件名称 | 版本 | x86_64 | aarch64 | 说明 |
---|---|---|---|---|
JDK | 11.0.24 | 11.0.24 | 11.0.24 | Java 环境解析器 |
Filebeat | 7.17.25 | 7.17.25 | 7.17.25 | 前端日志收集器 |
Logstash | 7.17.25 | 7.17.25 | 7.17.25 | 日志收集、过滤、转发 |
Zookeeper | 3.4.14 | 资源调度、协作 | ||
Kafka | 2.13-2.8.2 | 消息通信中间件 | ||
Elasticsearch | 7.17.25 | 7.17.25 | 7.17.25 | 日志存储 |
Kibana | 7.17.25 | 7.17.25 | 7.17.25 | 日志展示、分析 |
7.3 安装 JDK 及配置环境变量
1
2
vi /etc/profile
source /etc/profile
1、Elasticsearch 配置 Java 环境变量
1
export ES_JAVA_HOME=/usr/local/jdk/jdk-11.0.24
2、其他配置 Java 环境变量
1
2
3
4
JAVA_HOME=/usr/local/jdk/jdk-11.0.24
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar
export JAVA_HOME PATH CLASSPATH
7.4 安装并配置 Elasticsearch 集群
7.4.1 Elasticsearch 集群的架构与角色
在 Elasticsearch 的架构中,有三类角色,分别是 Client Node、Data Node 和 Master Node,搜索查询的请求一般是经过 Client Node 向 Data Node 获取数据,而索引查询首先请求 Master Node 节点,然后 Master Node 将请求分配到多个 Data Node 节点完成一次索引查询。
7.4.2 Elasticsearch 集群的安装及配置
1、安装与授权
由于 Elasticsearch 可以接收用户输入的脚本并且执行,为了系统安全考虑,需要创建一个单独的用户用来运行 Elasticsearch ,这里创建的普通用户是 es,操作如下:
1
useradd es
然后将 Elasticsearch 的安装目录都授权给 es 用户,操作如下:
1
chown -R es:es /usr/local/elasticsearch
2、操作系统调优
调整内核参数,将下面内容添加到 /etc/sysctl.conf
文件中:
1
2
fs.file-max=655360
vm.max_map_count=262144
fs.file-max
主要是配置系统最大打开文件描述符数,建议修改为 655360 或者更高;
vm.max_map_count
影响 JAVA 线程数量,用于限制一个进程可以拥有的 VMA(虚拟内存区域)大小,系统默认是 65530,建议修改成 262144 或者更高。
另外,还需要调整进程最大打开文件描述符(nofile)、最大用户进程数(nproc)和最大锁定内存地址空间(memlock),添加如下内容到 /etc/security/limits.conf
文件中:
1
2
3
4
5
6
root soft nproc 204800
root hard nproc 204800
root soft nofile 655360
root hard nofile 655360
root soft memlock unlimited
root hard memlock unlimited
CentOS 可以将 root
改为 *
;
CentOS 需要删除 /etc/security/limits.d/20-nproc.conf
文件;
退出 ssh 重新登录。
1
2
sysctl -p
ulimit -a
3、 JVM 调优
JVM 调优主要是针对 Elasticsearch 的 JVM 内存资源进行优化,Elasticsearch 的内存资源配置文件 jvm.options
,此文件位于 /usr/local/elasticsearch/elasticsearch-7.17.25/config
目录下,打开此文件,修改如下内容:
1
2
3
4
5
6
-Xms2g
-Xmx2g
# 8-13:-XX:+UseConcMarkSweepGC
# 8-13:-XX:CMSInitiatingOccupancyFraction=75
# 8-13:-XX:+UseCMSInitiatingOccupancyOnly
可以看到,默认 JVM 内存为 4g,可根据服务器内存大小,修改为合适的值。一般设置为服务器物理内存的一半最佳。
4、配置 Elasticsearch
Elasticsearch 的配置文件均在 Elasticsearch 根目录下的 config 文件夹中,这里是 /usr/local/elasticsearch/elasticsearch-7.17.25/config
目录,主要有 jvm.options、elasticsearch.yml 和 log4j2.properties 三个主要配置文件。这里重点介绍 elasticsearch.yml 一些重要的配置项及其含义。这里配置的 elasticsearch.yml 文件内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 集群名称
cluster.name: elkbigdata
# 节点名称
node.name: server1
# 数据文件目录
path.data: /data1/elasticsearch,/data2/elasticsearch
# 日志文件目录
path.logs: /usr/local/elasticsearch/elasticsearch-7.17.25/logs
# 提供服务的IP地址
network.host: 0.0.0.0
# 提供HTTP服务的端口
http.port: 9200
# 发现集群中的其他节点
discovery.seed_hosts: ["192.168.64.26", "192.168.64.27", "192.168.64.28"]
# 参与主节点选举的候选节点列表
cluster.initial_master_nodes: ["server1","server2","server3"]
# 锁定其进程所使用的内存,以防止操作系统将这部分内存交换(swap)到磁盘上,从而确保 ES 的性能和稳定性。
# swap 是一种用于辅助内存管理的机制。它涉及到将一部分内存(通常是物理内存,也称为 RAM)中的数据暂时移动到磁盘上的一个专门区域,以便为当前需要更多内存资源的进程腾出空间。
# bootstrap.memory_lock: true
1
2
3
4
5
6
7
8
9
10
11
mkdir -p /data1/elasticsearch
mkdir -p /data2/elasticsearch
chown -R es:es /data1/elasticsearch
chown -R es:es /data2/elasticsearch
chown -R es:es /usr/local/elasticsearch
cd /usr/local/elasticsearch/elasticsearch-7.17.25
su es
./bin/elasticsearch -d
exit
netstat -nlp | grep 9200
tail -f ./logs/elkbigdata.log
集群的所有节点都启动后,查看集群的健康情况。如果某个节点的健康状态是黄色的,删除数据重新启动。
1
2
curl http://192.168.64.26:9200
curl -X GET "http://192.168.64.26:9200/_cluster/health?pretty"
7.5 安装并配置 Zookeeper 集群
对于集群模式下的 ZooKeeper 部署,官方建议至少要三台服务器,关于服务器的数量,推荐是奇数个(3、5、7、9 等等),以实现 ZooKeeper 集群的高可用,这里使用三台服务器进行部署。
1、 ZooKeeper 概念介绍
在介绍 ZooKeeper 之前,先来介绍一下分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑裂)的后果。
这里首先介绍下什么是分布式系统,所谓分布式系统就是在不同地域分布的多个服务器,共同组成的一个应用系统来为用户提供服务,在分布式系统中最重要的是进程调度。
假设有一个分布在三个地域的服务器组成的一个应用系统,在第一台机器上挂载了一个资源,然后这三个地域分布的应用进程都要竞争这个资源,但我们又不希望多个进程同时进行访问,这个时候就需要一个协调器,来让它们有序的来访问这个资源。这个协调器就是分布式系统中经常提到的那个 “锁”。
例如“进程1“在使用该资源的时候,会先去获得这把锁,“进程1”获得锁以后会对该资源保持独占,此时其它进程就无法访问该资源,“进程1”在用完该资源以后会将该锁释放掉,以便让其它进程来获得锁。由此可见,通过这个“锁”机制,就可以保证分布式系统中多个进程能够有序的访问该共享资源。这里把这个分布式环境下的这个“锁”叫作分布式锁。这个分布式锁就是分布式协调技术实现的核心内容。
综上所述,Zo okeeper 是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。
2、ZooKeeper 应用举例
这里以 ZooKeeper 提供的基本服务分布式锁为例进行介绍。在分布式锁服务中,有一种最典型应用场景,就是通过对集群进行 Master 角色的选举,来解决分布式系统中的单点故障问题。所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点负责任务的处理,而当主节点发生故障时,整个应用系统也就瘫痪了,那么这种故障就称为单点故障。
解决单点故障,传统的方式是采用一个备用节点,这个备用节点定期向主节点发送 Ping 包,主节点收到 Ping 包以后向备用节点发送回复 Ack 信息,当备用节点收到回复的时候就会认为当前主节点运行正常,让它继续提供服务。而当主节点故障时,备用节点就无法收到回复信息了,此时,备用节点就认为主节点宕机,然后接替它成为新的主节点继续提供服务。
3、ZooKeeper 工作原理
下面通过三种情形,介绍一下 Zookeeper 是如何进行工作的。
(1) Master 启动
在分布式系统中引入 Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节点为例,假定它们是“主节点A”和”主节点B”,当两个主节点都启动后,它们都会向 ZooKeeper 中注册节点信息。我们假设”主节点A”锁注册的节点信息是“master00001”,”主节点B”注册的节点信息是”master00002”,注册完以后会进行选举,选举有多种算法,这里以编号最小作为选举算法,那么编号最小的节点将在选举中获胜 并获得锁成为主节点,也就是“主节点A”将会获得锁成为主节点,然后”主节点B”将被阻塞成为一个备用节点。这样,通过这种方式 Zookeeper 就完成了对两个 Master 进程的调度。完成了主、备节点的分配和协作。
(2)Master 故障
如果”主节点A”发生了故障,这时候它在 Zookeeper 所注册的节点信息会被自动删除,而 Zookeeper 会自动感知节点的变化,发现“主节点A”故障后,会再次发出选举,这时候”主节点B”将在选举中获胜,替代”主节点A”成为新的主节点,这样就完成了主节点的重新选举。
(3)Master 恢复
如果主节点恢复了,它会再次向 ZooKeeper 注册自身的节点信息,只不过这时候它注册的节点信息将会变成”master00003”,而不是原来的信息。ZooKeeper 会感知节点的变化再次发动选举,这时候”主节点B”在选举中会再次获胜继续担任”主节点”,”主节点A”会担任备用节点。
Zookeeper 就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步的。
4、ZooKeeper 集群架构
Zookeeper 一般是通过集群架构来提供服务的,下图是 Zookeeper 的基本架构图。
Zookeeper 集群主要角色有 Server 和 Client,其中,Server 又分为 Leader、Follower 和 Observer三个角色,每个角色的含义如下:
- Leader:领导者角色,主要负责投票的发起和决议,以及更新系统状态。
- Follower:跟随者角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票。
- Observer:观察者角色,用户接收客户端的请求,并将写请求转发给 Leader,同时同步 Leader 状态,但不参与投票。Observer 目的是扩展系统,提高伸缩性。
- Client:客户端角色,用于向 Zookeeper 发起请求。
Zookeeper 集群中每个 Server 在内存中存储了一份数据,在 Zookeeper 启动时,将从实例中选举一个 Server 作为 Leader,Leader 负责处理数据更新等操作,当且仅当大多数 Server 在内存中成功修改数据,才认为数据修改成功。
Lookeeper 写的流程为:客户端 Client 首先和一个 Server 或者 Observe 通信,发起写请求,然后 Server 将写请求转发给 Leader,Leader 再将写请求转发给其它 Server,其它 Server 在接收到写请求后写入数据并响应 Leader,Leader 在接收到大多数写成功回应后,认为数据写成功,最后响应 Client,完成一次写操作过程。
5、配置 Zookeeper
Zookeeper 的配置模板文件 /usr/local/zookeeper/zookeeper-3.4.14/conf/zoo_sample.cfg
,拷贝 zoo_sample.cfg
并重命名为 zoo.cfg
,重点配置如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 心跳 2s(度量单位)
tickTime=2000
# Follower 节点连接到 Leader 节点的超时时间为 10 个心跳时间(tickTime)20s
initLimit=10
# Follower 节点与 Leader 节点间消息传递请求与应答的超时时间为 5 个心跳时间(tickTime)10s
syncLimit=5
# 数据文件和日志文件的路径,可通过 dataLogDir 单独指定日志文件路径
dataDir=/data/zookeeper
# 端口
clientPort=2181
# 集群中服务器的信息,2888 代表此服务器与集群中 Leader 服务器通信的端口,3888 选举时通信的端口
server.1=192.168.64.22:2888:3888
server.2=192.168.64.23:2888:3888
server.3=192.168.64.24:2888:3888
1
mkdir -p /data/zookeeper
除了修改 zoo.cfg
配置文件外,集群模式下还需要配置一个文件 myid
,这个文件需要放在 dataDir
配置项指定的目录下,这个文件里面只有一个数字,如果要写入 1,表示第一个服务器,与 zoo.cfg
文件中的 server.1 中的 1 对应,以此类推。Zookeeper 在启动时会读取这个文件,得到里面的数据与 zoo.cfg
里面的配置信息比较,从而判断每个 Zookeeper Server 的对应关系。
为了保证 Zookeeper 集群配置的规范性,建议将 Zookeeper 集群中每台服务器的安装路径和数据文件路径都保存一致。
1
vi /data/zookeeper/myid
1
2
3
4
cd /usr/local/zookeeper/zookeeper-3.4.14
./bin/zkServer.sh start
netstat -nlp | grep 2181
tail -f ./zookeeper.out
7.6 安装并配置 Kafka Broker 集群
1、Kafka 基本概念
Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这是官方对 Kafka 的定义,Kafka 是 Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 hadoop 平台的数据分析、低时延的实时系统、storm/spark 流式处理引擎等。Kafka 现在它已被多家大型公司作为多种类型的数据管道和消息系统使用。
2、Kafka 角色术语
在介绍架构之前,先了解下 Kafka 中一些核心概念和各种角色。
- Broker:Kafka 集群包含一个或多个服务器,每个服务器被称 Broker。
- Topic:每条发布到 Kafka 集群的消息都有一个分类,这个类别被称为 Topic(主题)。
- Producer:指消息的生产者,负责发布消息到 Kafka Broker。
- Consumer:指消息的消费者,从 Kafka Broker 拉取数据,并消费这些已发布的消息。
- Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition,每个 Partition 都是一个有序的队列。Partition 中的每条消息都会被分配一个有序的 ID(称 Offset)。
- Consumer Group:消费者组,可以给每个 Consumer 指定消费者组,若不指定消费者组,则属于默认的 Group。
- Message:消息,通信的基本单位,每个 Producer 可以向一个 Topic 发布一些消息。
3、Kafka 拓扑架构
一个典型的 Kafka 集群包含若干 Producer,若干 Broker、若干 Consumer Group,以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置,选举 Leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 Broker,Consumer 使用 pull 模式从 Broker 订阅并消费消息。
4、Topic 与 Partition
Kafka 中的 Topic 是以 Partition 的形式存放的,每一个 Topic 都可以设置它的 Partition 数量,Partition 的数量决定了组成 Topic 的 log 数量。推荐 Partition 的数量一定要大于同时运行的 Consumer 的数量。另外,建议 Partition 的数量大于集群 Broker 的数量,这样消息数据就可以均匀的分布在各个 Broker 中。
那么,Topic 为什么要设置多个 Partition 呢,这是因为 Kafka 是基于文件存储的,通过配置多个 Partition 可以将消息内容分散存储到多个 Broker 上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个 Topic 切分成任意多个 Partitions,可以保证消息存储、消息消费的效率,因为越多的 Partitions 可以容纳更多的 Consumer,可有效提升 Kafka 的吞吐率。因此将 Topic 切分成多个 Partitions 的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写请求分担负载到各个集群 节点。
5、Kafka 消息发送机制
每当用户往某个 Topic 发送数据时,数据会被 hash 到不同的 Partition,这些 Partition 位于不同的集群节点上,所以每个消息都会被记录一个 offset 消息号,就是 offset 号。消费者通过这个 offset 号去查询读取这个消息。
发送消息流程为:
首先获取 Topic 的所有 Patition,如果客户端不指定 Patition,也没有指定 Key 的话,使用自增长的数字取余数的方式实现指定的 Partition。这样 Kafka 将平均的向 Partition 中生产数据。如果想要控制发送的 Partition,则有两种方式,一种是指定 Partition,另一种就是根据 Key 自己写算法。实现其 Partition 方法。
每一条消息被发送到 Broker 时,会根据 Paritition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition 里,这样就实现了水平扩展。同时,每条消息被 append 到 Partition 中时,是顺序写入磁盘的,因此效率非常高,经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证。
6、Kafka 消息消费机制
Kafka 中的 Producer 和 Consumer 采用的是 push(推送)、pull(拉取)的模式,即 Producer 只是向 Broker push 消息,Consumer 只是从 Broker pull 消息,push 和 pull 对于消息的生产和消费是异步进行的。pull 模式的一个好处是 Consumer 可自主控制消费消息的速率,同时 Consumer 还可以自己控制消费消息的方式是批量的从 Broker 拉取数据还是逐条消费数据。
当生产者将数据发布到 Topic 时,消费者通过 pull 的方式,定期从服务器拉取数据,当然在 pull 数据的时候,服务器会告诉 Consumer 可消费的消息 Offset。
消费规则:
- 不同 Consumer Group 下的消费者可以消费 partition 中相同的消息,相同的 Consumer Group 下的消费者只能消费 partition 中不同的数据。
- Topic 的 Partition 的个数和同一个消费组的消费者个数最好一致,如果消费者个数多于 Partition 个数,则会存在有的消费者消费不到数据。
- 服务器会记录每个 Consumer 在每个 Topic 的每个 Partition 下消费的 Offset,然后每次去消费拉取数据时,都会从上次记录的位置开始拉取数据。
7、Kafka 消息存储机制
在存储结构上,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件,每个 Partition(目录)相当于一个巨型文件被平均分配到多个大小相等 Segment(段)数据文件中。
Partiton 命名规则为 Topic 名称 + 序号,第一个 Partiton 序号从 0 开始,序号最大值为 Partitions 数量减 1。
在每个 Partiton(文件夹)中有多个大小相等的 Segment(段)数据文件,每个 Segment 的大小是相同的,但是每条消息的大小可能不相同,因此 Segment 数据文件中消息数量不一定相等。
Segment 数据文件有两个部分组成,分别为 index file 和 data file,这两个文件是一一对应,成对出现,后缀”.index”和”.log”的文件分别表示为 Segment 索引文件和数据文件。
其实 Kafka 最核心的思想是使用磁盘,而不是使用内存,使用磁盘操作有以下几个好处:
- 磁盘缓存由 Linux 系统维护,减少了程序员的不少工作。
- 磁盘顺序读写速度超过内存随机读写。
- JVM 的 GC 效率低,内存占用大。使用磁盘可以避免这一问题。
- 系统冷启动后,磁盘缓存依然可用。
8、配置
Kafka 的主配置文件为 /usr/local/kafka/kafka_2.13-2.8.2/config/server.properties
,这里以节点 kafka1 为例,重点介绍一些常用配置项的含义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 标识
broker.id=1
# 监听地址
listeners=PLAINTEXT://192.168.64.22:9092
# 数据文件,在多路径情况下,Kafka 在分配 Partition 时会根据最少被使用原则选择目录分配,不是根据磁盘大小而是根据 Partition 的个数
log.dirs=/data1/kafka/logs,/data2/kafka/logs
# Partition 数量,最好是大于等于消费者的数量
num.partitions=6
# 数据的保存时间,log.retention.hours、log.retention.minutes 和 log.retention.ms(推荐)
log.retention.hours=168
# 段的大小
log.segment.bytes=1073741824
# Zookeeper 集群信息
zookeeper.connect=192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181
# 开启自动创建 Topic
auto.create.topics.enable=true
# 开启自动删除 Topic(false:逻辑删除,ture:物理删除)
delete.topic.enable=true
1
2
mkdir -p /data1/kafka/logs
mkdir -p /data2/kafka/logs
1
2
3
4
cd /usr/local/kafka/kafka_2.13-2.8.2/
nohup ./bin/kafka-server-start.sh ./config/server.properties &
netstat -nlp | grep 9092
tail -f ./nohup.out
9、常用命令
Kafka 提供了多个命令用于查看、创建、修改、删除 Topic 信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于 Kafka 安装目录的 bin 目录下,这里是 /usr/local/kafka/kafka_2.13-2.8.2/bin
。登录任意一台 Kafka 集群节点,切换到此目录下,即可进行命令操作。下面列举一些常用命令的使用方法。
(1)显示 Topic 列表
1
./bin/kafka-topics.sh --zookeeper 192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181 --list
(2)创建一个 Topic,并指定 Topic 属性(副本数、分区数等)
1
./bin/kafka-topics.sh --zookeeper 192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181 --topic topic1 --create --replication-factor 1 --partitions 3
(3)查看某个 Topic 的状态
1
./bin/kafka-topics.sh --zookeeper 192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181 --topic topic1 --describe
(4) 生产消息
1
./bin/kafka-console-producer.sh --broker-list 192.168.64.22:9092,192.168.64.23:9092,192.168.64.24:9092 --topic topic1
(5)消费消息
1
2
3
4
# 旧版
./bin/kafka-console-consumer.sh --zookeeper 192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181 --topic topic1
# 新版
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.64.22:9092,192.168.64.23:9092,192.168.64.24:9092 --topic topic1 --from-beginning
(6)删除 Topic
1
./bin/kafka-topics.sh --zookeeper 192.168.64.22:2181,192.168.64.23:2181,192.168.64.24:2181 --topic topic1 --delete
7.7 安装并配置 Filebeat
1、什么是 Filebeat
Filebeat 是一个开源的文本日志收集器,它是 Elastic 公司 Beats 数据采集产品的一个子产品,采用 go 语言开发,一般安装在业务服务器上作为代理来监测日志目录或特定的日志文件,并把它们发送到 Logstash、Elasticsearch、Redis 或 Kafka 等。
2、Filebeat 架构与运行原理
Filebeat 是一个轻量级的日志监测、传输工具,它最大的特点是性能稳定、配置简单、占用系统资源很少。这也是强烈推荐 Filebeat 的原因。下图是官方给出的 Filebeat 架构图:
从图中可以看出,Filebeat 主要由两个组件构成:Prospector(Input 探测器)和 Harvester(收集器)。这两类组件一起协作完成 Filebeat 的工作。
其中,Harvester 负责进行单个文件的内容收集,在运行过程中,每一个 Harvester 会对一个文件进行内容逐行读取,并且把读到的内容发送到配置的 Output 中。当 Harvester 开始进行文件的读取后,将会负责这个文件的打开和关闭操作,因此,在 Harvester 运行过程中,文件都处于打开状态。如果在收集过程中,删除了这个文件或者是对文件进行了重命名,Filebeat 依然会继续对这个文件进行读取,这时候将会一直占用着文件所对应的磁盘空间,直到 Harvester 关闭。
Prospector 负责管理 Harvester,它会找到所有需要进行读取的数据源。然后交给 Harvester 进行内容收集,如果 input type 配置的是 log 类型,Prospector 将会去配置路径下查找所有能匹配上的文件,然后每一个文件创建一个 Harvester。
综上所述,Filebeat 的工作流程:当开启 Filebeat 程序的时候,它会启动一个或多个探测器(Prospector)去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,Filebeat 会启动收集进程(Harvester),每一个收集进程读取一个日志文件的内容,然后将这些日志数据发送到后台处理程序(Spooler),后台处理程序会集合这些事件,最后发送集合的数据到 Output 指定的目的地。
3、为什么要使用 Filebeat
Logstash 功能虽然强大,但是它依赖 Java、在数据量大的时候,Logstash 进程会消耗过多的系统资源,这将严重影响业务系统的性能,而 Filebeat 就是一个完美的替代者,Filebeat 是 Beat 成员之一,基于GO语言,没有任何依赖,配置文件简单,格式明了,同时,Filebeat 比 Logstash 更加轻量,所占用系统资源极少,非常适合安装在生产机器上。
4、配置 Filebeat
Filebeat 的配置文件目录 /usr/local/filebeat/filebeat-7.17.25-linux-arm64/filebeat.yml
,常用配置项内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 定义数据原型
filebeat.inputs:
# 数据的输入类型,log:日志
- type: log
# 关闭采用模块的方式配置,启用手工配置
enabled: true
# 要监控的日志文件,支持模糊匹配
paths:
- /var/log/auth.log
# 自定义列
fields:
log_topic: os_auth
# 主机 IP
name: "192.168.64.21"
# 输出 Kafka
output.kafka:
# 是否启用
enabled: true
# 集群
hosts: ["192.168.64.22:9092","192.168.64.23:9092","192.168.64.24:9092"]
# Topic
topic: '%{[fields.log_topic]}'
# 选择 Partition 的方式:轮询(最公平的一种方式)
partition.round_robin:
reachable_only: true
# 表示 Filebeat 会启动 2 个并发的 worker(工作线程)来向 Kafka 发送数据
worker: 2
# 指定生产者向 Kafka 发送消息时的确认模式,取值:0|1|-1
# 0:生产者不等待任何确认。
# 1:生产者需要从 Leader 分区收到确认后,才会认为消息已成功发送。
# -1:生产者需要等待所有副本(Leader、Follower)都确认收到消息后,才会认为发送成功。这是最安全的模式,可以保证即使 Leader 崩溃,也不会丢失数据,但延迟较高。
required_acks: 1
# 数据压缩方式
compression: gzip
# 单条消息最大 10 M
max_message_bytes: 10485760
processors:
- drop_fields:
fields: ["input","ecs","agent","host"]
ignore_missing: false
# 日志级别:error|warning|info|debug
logging.level: debug
1
2
3
4
cd /usr/local/filebeat/filebeat-7.17.25-linux-arm64
nohup ./filebeat -e -c filebeat.yml &
ps -ef | grep filebeat
tail -f nohup.out
7.8 安装并配置 Logstash 服务
1、Logstash 是怎么工作的
Logstash 是一个开源的、服务端的数据处理 pipeline(管道),它可以接收多个源的数据、然后对它们进行转换、最终将它们发送到指定类型的目的地。Logstash 是通过插件机制实现各种功能的,可以在 https://github.com/logstash-plugins 下载各种功能的插件,也可以自行编写插件。
Logstash 实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分,对应的插件依次是 input 插件、filter 插件、output 插件,其中,filter 插件是可选的,其它两个是必须插件。也就是说在一个完整的 Logstash 配置文件中,必须有 input 插件和 output 插件。
input 插件主要用于接收数据,Logstash 支持接收多种数据源,常用的有如下几种:
file:读取一个文件,这个读取功能有点类似于 linux 下面的 tail 命令,一行一行的实时读取。
syslog:监听系统 514 端口的 syslog messages,并使用 RFC3164 格式进行解析。
redis:Logstash 可以从 redis 服务器读取数据,此时 redis 类似于一个消息缓存组件。
kafka:Logstash 也可以从 kafka 集群中读取数据,kafka 加 Logstash 的架构一般用在数据量较大的业务场景,kafka 可用作数据的缓冲和存储。
filebeat:fiebeat 是一个文本日志收集器,性能稳定,并且占用系统资源很少,Logstash 可以接收 flebeat 发送过来的数据。
filter 插件主要用于数据的过滤、解析和格式化,也就是将非结构化的数据解析成结构化的、可查询的标准化数据,常用的 filter 插件有如下几个:
grok:grok 是 Logstash 最重要的插件,可解析并结构化任意数据,支持正则表达式,并提供了很多内置的规则和模板可供使用。此插件使用最多,但也最复杂。
mutate:此插件提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
date:此插件可以用来转换你的日志记录中的时间字符串。
GeoIP:此插件可以根据 IP 地址提供对应的地域信息,包括国家,省,市,经纬度等,对于可视化地图和区域统计非常有用。
output 插件用于数据的输出,一个 Logstash 事件可以穿过多个 output,直到所有的 output 处理完毕,这个事件才算结束。输出插件常见的有如下几种:
elasticsearch:发送数据到 elasticsearch。
file:发送数据到文件中。
redis:发送数据到 redis 中,从这里可以看出,redis 插件既可以用在 input 插件中,也可以用在 output 插件中。
kafka:发送数据到 kafka 中,与 redis 插件类似,此插件也可以用在 Logstash 的输入和输出插件中。
5、Logstash 配置文件入门
Logstash 的配置文件目录为 /usr/local/logstash/logstash-7.17.25/config
, 其中,jvm.options
是设置 JVM 内存资源的配置文件,logstash.yml
是 Logstash 全局属性配置文件,另外还需要自己创建一个
Logstash 事件配置文件,这里介绍下 Logstash 事件配置文件的编写方法和使用方式。
在介绍 Logstash 配置之前,先来认识一下 Logstash 是如何实现输入和输出的。Logstash 提供了一个 shell 脚本 /usr/local/logstash/logstash-7.17.25/bin/logstash
可以方便快速的启动一个 Logstash 进程,在 Linux 命令行下,运行如下命令启动 Logstash 进程:
1
2
cd /usr/local/logstash/logstash-7.17.25
./bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'
首先解释下这条命令的含义:
-e 代表执行的意思。
input 即输入的意思,input 里面即是输入的方式,这里选择了 stdin,就是标准输入(从终端输人)。
output 即输出的意思,output 里面是输出的方式,这里选择了 stdout,就是标准输出(输出到终端)。
codec 是个插件,表明格式。这里放在 stdout 中,表示输出的格式,rubydebug 是专门用来做测试的格式,一般用来在终端输出 JSON 格式。
1
2
3
nohup ./bin/logstash -f ./config/logstash-1.conf &
ps -ef | grep logstash
tail -f nohup.out
文件输入、Kafka 输出
Filebeat 输入、标准输出和 Kafka 输出
言归正传,Kafka 输入、Elasticsearch 输出
1
nohup ./bin/logstash -f ./config/logstash-kafka-elasticsearch.conf &
7.9 安装并配置 Kibana 展示日志数据
1、配置 Kibana
Kibana 的配置文件 /usr/local/kibana/kibana-7.17.25-linux-aarch64/config/kibana.yml
,Kibana 的配置非常简单,常用的配置项内容如下:
1
2
3
4
5
6
7
8
# 监听端口
server.port: 5601
# 绑定 IP 地址,如果内网访问,设置为内网地址即可
server.host: "192.168.64.26"
# Kibana 访问 Elasticsearch 的地址
elasticsearch.hosts:["http://192.168.64.26:9200","http://192.168.64.27:9200","http://192.168.64.28:9200"]
# 用于存储 Kibana 数据信息的索引
kibana.index: ".kibana"
1
2
3
4
cd /usr/local/kibana/kibana-7.17.25-linux-aarch64
nohup ./bin/kibana --allow-root &
netstat -nlp | grep 5601
tail -f nohup.out
http://192.168.64.26:5601
7.10 调试并验证日志数据流向
经过上面的配置过程,大数据日志分析平台已经基本构建完成。
8、Logstash 配置语法详解
8.1 Logstash 基本语法组成
Logstash 之所以功能强大和流行,与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
Logstash 配置文件有如下三部分组成,其中 input、output 部分是必须配置,filter 部分是可选配置,而 filter 就是过滤器插件,可以在这部分实现各种日志过滤功能。
1
2
3
4
5
6
7
8
9
input {
#输入插件
}
filter {
#过滤匹配插件
}
output {
#输出插件
}
8.2 Logstash 输入插件(Input)
1、读取文件(File)
Logstash 使用一个名 filewatch 的 ruby gem 库来监听文件变化,并通过一个叫 .sincedb 的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个 .sincedb 数据文件的默认路径在 <path.data>/plugins/inputs/file
下面,文件名类似于 .sincedb_452905a167cf4509fd08acb964fdb20c
,而
看下面一个事件配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
input {
file {
path => ["/var/log/messages"]
type => "system"
start_position => "beginning"
}
}
output {
stdout {
codec => rubydebug
}
}
这个配置是监听并接收本机的 /var/log/messages
文件内容,start_position 表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似 cat 命令,默认情况下,Logstash 会从文件的结束位置开始读取数据,也就是说 Logstash 进程会以类似 tail -f 命令的形式逐行获取数据。type 用来标记事件类型,通常会在输人区域通过 type 标记事件类型。
2、标准输入(Stdin)
stdin 是从标准输入获取信息,下面是一个关于 stdin 的事件配置文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
input {
stdin {
add_field => {"Key"=>"Value"}
tags => ["tag1"]
type => "type"
}
}
output {
stdout {
codec => rubydebug
}
}
3、读取 Syslog 日志
如何将 rsyslog 收集到的日志信息发送到 Logstash 中,需要做如下两个步骤的操作:
首先,在需要收集日志的服务器上找到 rsyslog 的配置文件 /etc/rsyslog.conf
添加如下内容:
1
*.* @@192.168.64.25:5514
其中,192.168.64.25 是 Logstash 服务器的地址,5514 是 Logstash 启动的监听端口。
接着,重启 rsyslog 服务:
1
systemctl restart rsyslog
然后,在 logstash 服务器上创建一个事件配置文件,内容如下:
1
2
3
4
5
6
7
8
9
10
11
input {
syslog {
port => "5514"
}
}
output {
staout {
codec => rubydebg
}
}
4、读取 TCP 网络数据
下面的配置文件就是通过 Logstash:Inputs:TCP
和 Logstash:Fiters:Grok
配合实现 syslog 功能的例子,这里使用了 Logstash 的 TCP/UDP 插件读取网络数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
tcp {
port => "5514"
}
}
filter {
grok {
match => {"message" => "%{SYSLOGLINE}"}
}
}
output {
stdout {
codec => rubydebug
}
}
其中,5514 端口是 Logstash 启动的TCP 监听端口。
8.3 Logstash 编码插件(Codec)
其实我们已经用过编码插件 codec 了,也就是这个 rubydebug,它就是一种 codec,虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具。
编码插件(Codec)可以在 Logstash 输入或输出时处理不同类型的数据,因此,Logstash 不只是一个input –> fller –> output 的数据流,而是一个 input –> decode –> filter –> encode –> output 的数据流。
Codec 支持的编码格式常见的有 plain、json、json_lines 等。下面依次介绍。
1、plain plain 是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。下面是一个包含 plain 编码的配置文件:
1
2
3
4
5
6
7
8
9
10
11
input{
stdin {
}
}
output {
stdout {
codec => "plain"
}
}
2、ison、json_lines
如果发送给 Logstash 的数据内容为 json 格式,可以在 input 字段加入 codec => json 来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让 Logstash 输出为 json 格式,可以在 output 字段加入 codec => json,下面是一个包含 json 编码的配置文件:
1
2
3
4
5
6
7
8
9
10
11
input{
stdin {
}
}
output {
stdout {
codec => "json"
}
}
这就是 json 格式的输出,可以看出,json 每个字段是 key:value 格式,多个字段之间通过逗号分隔。有时候,如果 json 文件比较长,需要换行的话,那么就要用 json_lines 编码格式了。
8.4 Logstash 过滤器插件(Filter)
1、Grok 正则捕获
Grok 是一个十分强大的 Logstash Filter 插件,他可以通过正则解析任意文本,将非结构化日志数据转化成结构化日志数据以方便查询。他是目前 Logstash 中解析非结构化日志数据最好的方式。
Grok 的语法规则是:%{语法:语义}
,“语法”就是匹配的模式,例如使用 NUMBER 模式可以匹配出数字,IP 模式则会匹配出像 127.0.0.1 这样的 IP 地址。
例如输入的内容为:
1
192.168.64.25 [20/Dec/2024:22:22:22 +0800] "GET / HTTP/1.1" 403 5039
那么,%{IP:clientip}
匹配模式的结果:clientip: 192.168.64.25
,%{HTTPDATE:timestamp}
匹配模式的结果为:timestamp:20/Dec/2024:22:22:22 +0800
,%{QS:referrer}
匹配模式的结果为:referrer:"GET / HTTP/1.1"
。
调试工具:Grok Debugger
下面是一个组合匹配模式,它可以获取上面输入的所有内容:
1
%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
通过这个组合匹配模式,可以将输入的内容分成了五个部分,即五个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用 Grok 的自的。
Logstash 默认提供了近 200 个匹配模式(其实就是定义好的正则表达式)让我们来使用,可以在 Logstash 的安装目录下,例如在 /usr/local/logstash/logstash-7.17.25/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns
目录里面查看,基本定义在 grok-patterns
文件中。
从这些定义好的匹配模式中,可以查到上面使用的四个匹配模式对应的定义规则:
匹配模式 | 正则定义规则 |
---|---|
IP | (?:%{IPV6}|%{IPV4}) |
HTTPDATE | %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} |
QS | %{QUOTEDSTRING} |
NUMBER | (?:%{BASE10NUM}) |
2、时间处理(Date)
时间字段 | 字母 | 含义 |
---|---|---|
年 | yyyy | 表示四位数年份。例如:2024 |
年 | yy | 表示两位数年份。例如:2024年即为24 |
月 | M | 表示一位数月份。例如:1月份为1,12月份为12 |
月 | MM | 表示两位数月份。例如:1月份为01,12月份为12 |
月 | MMM | 表示缩短的月份文本,例如:1月份为Jan,12月份为Dec |
月 | MMMM | 表示全拼的月份文本,例如:1月份为Janvary,12月份为December |
日 | d | 表示1位数的几号,例如:1表示某月1号 |
日 | dd | 表示2位数的几号,例如:01表示某月1号 |
时 | H | 表示1位数的小时,例如:1表示凌晨1点 |
时 | HH | 表示2位数的小时,例如:01表示凌晨1点 |
分 | m | 表示1位数的分钟,例如:1表示某点1分 |
分 | mm | 表示2位数的分钟,例如:01表示某点1分 |
秒 | s | 表示1位数的秒,例如:1表示某点某分1秒 |
秒 | ss | 表示2位数的秒,例如:01表示某点某分1秒 |
时区 | Z | 表示时区偏移,结构为HHmm,例如:+0800 |
时区 | ZZ | 表示时区偏移,结构为HH:mm,例如:+08:00 |
时区 | ZZZ | 表示时区身份,例如:Asia/Shanghai |
3、数据修改(Mutate)
(1)正则表达式替换匹配字段
gsub 可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于 mutate 插件中 gsub 的示例(仅列出 filter 部分):
1
2
3
4
5
filter {
mutate {
gsub => ["referrer",'"',""]
}
}
这个示例表示将 referrer 字段中所有的双引号字符替换为空。
(2)分隔符分割字符串为数组
split 可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于 mutate 插件中 split 的示例(仅列出 filter 部分):
1
2
3
4
5
filter {
mutate {
split => ["referrer"," "]
}
}
这个示例表示将 clientip 字段以空格分隔为数组。
(3)重命名字段
rename 可以实现重命名某个字段的功能,下面是一个关于 mutate 插件中 rename 的示例(仅列出 filter 部分):
1
2
3
4
5
filter {
mutate {
rename => ["clientip","clientIp"]
}
}
这个示例表示将字段 clientip 重命名为 clientIp。
(4)删除字段
remove_field 可以实现删除某个字段的功能,下面是一个关于 mutate 插件中 remove_field 的示例(仅列出 filter 部分):
1
2
3
4
5
filter {
mutate {
remove_field => ["timestamp"]
}
}
这个示例表示将字段 timestamp 删除。
4、GeolP 地址归类查询
GeoIP 是最常见的免费 IP 地址归类查询库,当然也有收费版可以使用。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别、省市、经纬度等,此插件对于可视化地图和区域统计非常有用。下面是一个关于 GeoIP 插件的简单示例(仅列出 filter 部分):
1
2
3
4
5
filter {
geoip {
source => "clientIp"
}
}
其中,clientIp 字段是输出 IP 地址的一个字段。
5、示例
logstash-xxx.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
input {
stdin {
}
}
filter {
grok {
match => ["message", "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
remove_field => ["message"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate {
remove_field => ["timestamp"]
rename => ["clientip","clientIp"]
gsub => ["referrer",'"',""]
split => ["referrer"," "]
}
geoip {
source => "clientIp"
fields => ["ip","country_name","region_name","city_name","longitude","latitude","timezone"]
}
}
output {
stdout {
codec => "rubydebug"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/usr/local/logstash/logstash-7.17.25/bin/logstash -f /usr/local/logstash/logstash-7.17.25/config/logstash-xxx.conf
1.68.95.141 [20/Dec/2024:22:22:22 +0800] "GET / HTTP/1.1" 403 5039
{
"@timestamp" => 2024-12-20T14:22:22.000Z,
"host" => "logstash",
"@version" => "1",
"response" => "403",
"bytes" => "5039",
"referrer" => [
[0] "GET",
[1] "/",
[2] "HTTP/1.1"
],
"clientIp" => "1.68.95.141",
"geoip" => {
"latitude" => 37.0244,
"timezone" => "Asia/Shanghai",
"region_name" => "Shanxi",
"longitude" => 111.9125,
"country_name" => "China",
"city_name" => "Jiexiu",
"ip" => "1.68.95.141"
}
}