ELK接收paloalto日志并钉钉告警

本人环境介绍

1
2
3
4
5
6
ubuntu 14.04
python 2.7
kibana-5.5.2
logstash-5.5.2
elasticsearch-5.5.2
paloalto软件版本7.1.14

1、ELK安装

elasticsearch下载地址:https://www.elastic.co/cn/downloads/elasticsearch

kibana下载地址:https://www.elastic.co/cn/downloads/kibana

Logstash下载地址:https://www.elastic.co/cn/downloads/logstash

elasticsearch和kibana配置就不多说了,比较简单。

2、logstash的配置

新建syslog.conf文件,此版本的paloalto的有50多个字段,暴力配置如下,所有转发过来的日志直接丢到elasticsearch里面,最后用kibana展示

启动logstash:nohup ./bin/logstash -f syslog.conf &

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input{
syslog{#输入syslog
type => "syslog"
port => 514
}
}

filter {
grok {
match => ["message", "%{DATA:Domain}\,%{DATA:Receive-Time}\,%{DATA:Serial}\,%{DATA:Type}\,%{DATA:Threat-Type}\,%{DATA:Config-Version}\,%{DATA:Generate-Time}\,%{IP:Source-address}\,%{IP:Destination-address}\,%{DATA:NAT-Source-IP}\,%{DATA:NAT-Destination-IP}\,%{DATA:Rule}\,%{DATA:Source-User}\,%{DATA:Destination-User}\,%{DATA:Application}\,%{DATA:Virtual-System}\,%{DATA:Source-Zone}\,%{DATA:Destination-Zone}\,%{DATA:Inbound-Interface}\,%{DATA:Outbound-Interface}\,%{DATA:Log-Action}\,%{DATA:Time-Logged}\,%{DATA:Session-ID}\,%{DATA:Repeat-Count}\,%{DATA:Source-Port}\,%{DATA:Destination-Port}\,%{DATA:NAT-Source-Port}\,%{DATA:NAT-Destination-Port}\,%{DATA:Flags}\,%{DATA:IP-Protocol}\,%{DATA:Action}\,%{DATA:URL}\,%{DATA:Threat-Content-Name}\,%{DATA:Category}\,%{DATA:Severity}\,%{DATA:Direction}\,%{DATA:Sequence-Number}\,%{DATA:Action-Flags}\,%{DATA:Source-Country}\,%{DATA:Destination-Country}\,%{DATA:cpadding}\,%{DATA:contenttype}\,%{DATA:pcap_id}\,%{DATA:filedigest}\,%{DATA:cloud}\,%{DATA:url_idx}\,%{DATA:user_agent}\,%{DATA:filetype}\,%{DATA:xff}\,%{DATA:referer}\,%{DATA:sender}\,%{DATA:subject}\,%{DATA:recipient}\,%{DATA:reportid}\,%{DATA:dg_hier_level_1}\,%{DATA:dg_hier_level_2}\,%{DATA:dg_hier_level_3}\,%{DATA:dg_hier_level_4}\,%{DATA:Virtual-System-Name}\,%{DATA:Device-Name}\,%{DATA:file_url}"]
}
}

output{
elasticsearch{
hosts => ["x.x.x.x:9200"]
index => "syslog"
}
# stdout{#控制台打印输出
# codec => rubydebug
# }
}

3、paloalto设置

一共有寄个地方要注意一下,否则日志转发不成功

1、创建syslog,转发到logstash服务器

2、配置转发用syslog

3、配置你想要的日志类型和严重性

4、在安全策略出匹配设置的日志转发


5、最后记得提交配置,否则不生效

最终效果


4、elastalert设置

elastalert:https://github.com/Yelp/elastalert.git
钉钉告警:https://github.com/xuyaoqiang/elastalert-dingtalk-plugin
部分依赖:

1
sudo apt-get install python-dev libffi-dev

1、安装过程如下:

1
2
3
4
5
6
pip install elastalert
或者
git clone https://github.com/Yelp/elastalert.git
cd elastalert
sudo python setup.py install
sudo pip install -r requirements.txt

其中有部分依赖可能安装错误,请单独下载安装既可。

2、安装完继续
在elasticsearch中创建elastalert的日志索引

1
sudo elastalert-create-index --index elastalert

根据自己的情况,填入elasticsearch的相关信息,关于
elastalert_status部分直接回车默认的即可。
如下所示:

1
2
3
4
5
6
7
8
9
10
11
Enter Elasticsearch host: localhost
Enter Elasticsearch port: 9200
Use SSL? t/f:
Enter optional basic-auth username (or leave blank):
Enter optional basic-auth password (or leave blank):
Enter optional Elasticsearch URL prefix (prepends a string to the URL of every request):
Name of existing index to copy? (Default None)
Elastic Version:5
Mapping used for string:{'index': 'not_analyzed', 'type': 'string'}
New index elastalert created
Done!

3、创建配置文件
3.1、修改elastalert的配置文件
下载https://github.com/xuyaoqiang/elastalert-dingtalk-plugin,把其中的把elastalert-dingtalk-plugin中的elastalert_modules、rules和config.yaml复制到elastalert下
修改config.yaml对应配置

1
2
es_host: elasticsearch 地址
es_port: elasticsearch 端口

3.2、修改rules的配置文件

官方有很多rules规则可以去看官方文档:
http://elastalert.readthedocs.io/en/latest/ruletypes.html#rule-types

需添加必要的配置, 修改添加报警规则xxx.yaml,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
name: IPS安全告警
#唯一值重复告警规则
type: cardinality

#es_host: localhost
#es_port: 9200

# Index to search, wildcard supported
index: syslog

#自定义的唯一值字段
cardinality_field: Source-address.keyword

#最小5次触发规则
min_cardinality: 5

#max_cardinality: 5

# 60秒内
timeframe:
seconds: 30

# 5 分钟内相同的报警不会重复发送
realert:
minutes: 5

# 指数级扩大 realert 时间,中间如果有报警,
# 则按照 5 -> 10 -> 20 -> 40 -> 60 不断增大报警时间到制定的最大时间,
# 如果之后报警减少,则会慢慢恢复原始 realert 时间
exponential_realert:
hours: 1

# ES 查询,用以过滤
#filter:
#- term:
# Severity: "high"
# (Required)
# The alert is use when a match is found
alert:
- "debug"
#你自己定义的钉钉告警脚本
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"

dingtalk_webhook: 在钉钉群中添加机器人可以获取
dingtalk_msgtype: "text"

4、钉钉的配置


5、配置elastalert,变为自定义告警内容
5.1、启用钉钉的报警
原生告警比较不友好

修改为:

修改了elastalert-dingtalk-plugin-master\elastalert_modules\dingtalk_alert.py里面的代码,代码如下:
为了获取告警时间,然后对这个时间变为时间段,到es里面查询,获取对应的字段值,这个里的时间转换比较乱,代码写的渣,大佬可以忽略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: xuyaoqiang
@contact: xuyaoqiang@gmail.com
@date: 2017-09-14 17:35
@version: 0.0.0
@license:
@copyright:

"""
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
from elasticsearch import Elasticsearch
import time,datetime


class DingTalkAlerter(Alerter):

required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])

def __init__(self, rule):
super(DingTalkAlerter, self).__init__(rule)
self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
self.digtalk_title = self.rule.get('dingtalk_title', '')

def format_body(self, body):
#if body !="":
return body.encode('utf8')

def alert(self, matches):
headers = {
"Content-Type": "application/json",
"Accept": "application/json;charset=utf-8"
}
body = self.create_alert_body(matches)
# 获取对应时间段,并查询到对应可疑ip地址
get_time = body.split('\n')[4].split(' ')[1]
times = get_time.split('.')[0].split(':')[:2]
t = times[0]
if times[1]=='00' or times[1]=='01':
t1=int(times[1])
else:
t1 = int(times[1]) - 2
if times[1]=='58' or times[1]=='59':
t2=int(times[1])
else:
t2 = int(times[1]) + 2
# 将其转换为时间数组
timeStruct = time.strptime(t + ':' + str(t1), "%Y-%m-%dT%H:%M")
# 转换为时间戳:
timeStamp1 = int(time.mktime(timeStruct))
# 时间戳转换为指定格式日期
localTime = time.localtime(timeStamp1)
gt = time.strftime("%Y-%m-%dT%H:%M", localTime)
timeStruct = time.strptime(t + ':' + str(t2), "%Y-%m-%dT%H:%M")
# 转换为时间戳:
timeStamp2 = int(time.mktime(timeStruct))
# 时间戳转换为指定格式日期
localTime = time.localtime(timeStamp2)
lt = time.strftime("%Y-%m-%dT%H:%M", localTime)
# print(gt+'\n'+lt)
es = Elasticsearch("10.11.10.245:9200")
body = {
"query": {
"range" : {
"@timestamp" : {
"gt" : gt,
"lt": lt
}
}
}
}
res = es.search(index="syslog", body=body)
text = res['hits']['hits']
payload = {}
if len(text) != 0:
sip = text[0]['_source']['Source-address']
dip = text[0]['_source']['Destination-address']
dport = text[0]['_source']['Destination-Port']
atype = text[0]['_source']['Threat-Content-Name']
ntime = text[0]['_source']['Time-Logged']
payload = {
"msgtype": self.dingtalk_msgtype,
"text": {
"content": "IPS安全告警\n发现源ip地址: %s 在30秒内,对服务器ip:%s 的 %s 端口进行了5次攻击,攻击类型为 %s,请排除或确认攻击!\n(攻击时间点:%s)" % (sip, dip, dport, atype, ntime)
},
"at": {
"isAtAll":False
}
}
try:
response = requests.post(self.dingtalk_webhook_url,
data=json.dumps(payload, cls=DateTimeEncoder),
headers=headers)
response.raise_for_status()
except RequestException as e:
raise EAException("Error request to Dingtalk: {0}".format(str(e)))

def get_info(self):
return {
"type": "dingtalk",
"dingtalk_webhook": self.dingtalk_webhook_url
}
pass

6、最后开启所有组件

1
2
3
4
./elasticsearch-5.5.2/bin/elasticsearch &
./kibana-5.5.2-linux-x86_64/bin/kibana &
./logstash-5.5.2/bin/logstash -f /xxxx/logstash-5.5.2/syslog.conf &
python -m ./elastalert/elastalert.elastalert --verbose &

至此大功告成,其实elk还可以接收各种日志,自己做分析,然后告警,本文只是其中一个场景,大家可以收集所有日志一起做集中告警。
谢谢各位大佬捧场。

⭐您的支持将鼓励我继续创作⭐