반응형
AWS 솔루션들의 변경사항을 슬랙알림으로 받고 싶다.
로그인, codecommit 커밋, deploy, PR, s3, guardduty, waf 에 대한 알림추가하는 방법을 알려준다
AWS의 각 이벤트마다 슬랙으로 알림을 보내고 싶다면, 크게 2가지만 하면 된다.
EventBridge 등록하기 & lambda 함수 만들기, 추가적으로 SNS등록을 해야할 수도 있다
EventBridge 룰 생성
GuardDuty -> CloudWatch Events -> EventBridge -> SNS -> Lambda
EventBridge > rule > event pattern
- guard duty
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [4, 5, 6, 7, 8, 9, 10]
}
}
CloudWatch -> dashboard -> alert
- WAF & Shield
<WAF & Shield:본인의 url주소> <waf 대시보드의 차단값 평균이 4를 넘음:대시보드주소>으로 알람이 발생하였습니다.
임계값: 5 분 내 1개의 데이터 포인트에 대한 BlockedRequests > 4
지표 이름: BlockedRequests
WebACL : LB-PW-Prod
Rule : ALL
통계: 평균
기간: 5분
{
"source": [
"aws.cloudwatch"
],
"detail-type": [
"CloudWatch Alarm State Change"
],
"resources": [
"arn:aws:cloudwatch:ap-northeast-2:585251656769:alarm:wafv2_blocked_over4"
]
}
- 입력경로
{
"action": "$.detail.service.action",
"ip1": "$.detail.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4",
"ip2": "$.detail.service.action.awsApiCallAction.remoteIpDetails.ipAddressV4",
"name": "$.detail.type"
}
- 입력 템플릿
"GuardDuty Finding"
"공격유형 : <name>"
"<ip1><ip2> : <action>"
CloudTrail -> CloudWatch Events -> EventBridge -> Lambda
EventBridge > rule > event pattern
- aws-console-sign-in
{
"source": ["aws.signin"],
"detail-type": ["AWS Console Sign In via CloudTrail"]
}
- aws-creatkeypair
{
"source": ["aws.ec2"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["ec2.amazonaws.com"]
}
}
- codepipeline-deploy
{
"source": ["aws.codepipeline"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["codepipeline.amazonaws.com"]
}
}
- pr_codereview
{
"source": ["aws.codecommit"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["codecommit.amazonaws.com"]
}
}
- s3
{
"source": ["aws.s3"]
}
람다 생성
#noti.py
import json
import logging
import os
import time
from datetime import datetime
from datetime import timedelta
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
HOOK_URL = os.environ['HOOK_URL']
PR_CHANNEL = os.environ['slackChannel']
MONITORING_CHANNEL = os.environ['monitoringChannel']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
#logger.info("Event: " + str(event))
try:
check_duty(event)
logger.info("check_duty")
except:
try:
check_user(event)
logger.info("check_user")
except:
try:
check_s3(event)
logger.info("check_s3")
except:
try:
check_deploy(event)
logger.info("check_deploy")
except:
check_pullrequest(event)
#logger.info("check_pullrequest")
def get_account_info(data):
accountType = data['userIdentity']['type']
# Root 인지 IAMUser 인지 구분
if accountType == "Root":
accountUserName = "Root"
elif accountType == "IAMUser":
accountUserName = data['userIdentity']['userName']
else:
accountUserName = " "
return accountType, accountUserName
def get_kst_time(utc_time):
return datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S') - timedelta(hours=-9) #KST 시간 변환
def check_duty(event):
title = "GuardDuty"
msg = event['Records'][0]['Sns']['Message']
text = f"*{title}*\nDetect something, check NACL\n{msg}"
if "wafv2_blocked_over4" in msg:
title = "WAF & Shield"
text = f"*{title}*\n{msg.split('AlarmDescription')[1].split('AWSAccountId')[0]}"
msg = {
"channel": MONITORING_CHANNEL , #"#모니터링",
"username": title,
"icon_emoji": ":warning:",
"attachments": [{
"text": text,
"color": "#ba1f00"
}]
}
return send_slack(msg)
def check_pullrequest(event):
color = None
repo_be = "PayWatchBackend"
repo_fe = "Emmaus-Finance"
link = '<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/activity?region=ap-northeast-2|{1}: {0}>'
cm_link = '<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/commit/{2}?region=ap-northeast-2|{1} #{0}>'
kst_login_time = get_kst_time(event['time'][:19])
class PR_BASE:
def __init__(self):
self.string = ""
self.params = ""
self.sum_str = ""
def __str__(self):
return self.sum_str
def p(self):
return str(self.string.format(*self.params))
def add(self, string, params, nl=""):
self.string = string
self.params = params
self.sum_str += self.p() + nl
class PR:
def __init__(self, string, params):
self.string = string
self.params = params
def __str__(self):
return self.string.format(*self.params)
def p(self):
return str(self.string.format(*self.params))
class PR_CMNT(PR_BASE):
"""댓글, 리액션 이모지."""
def __init__(self, pr_id, ac_v, comment, repo_name, commit_id="", lc_path="", lc_line=""):
super().__init__()
self.add(link, (pr_id, repo_name, commit_id)) if pr_id else ""
self.add(' {0}', (ac_v,)) if ac_v else ""
self.add(' {0}', (comment.get('content'),)) if comment else ""
class PR_APPR(PR_BASE):
"""승인."""
def __init__(self, pr_id, repo_name):
super().__init__()
self.add(cm_link, (pr_id, repo_name)) if pr_id else ""
class PR_ELSE(PR_BASE):
"""PR생성, Merge."""
def __init__(self, pr_id, repo_name, title, src, dst, description=""):
super().__init__()
_src = src.split('heads/')[1] if src else "None"
_dst = dst.split('heads/')[1] if dst else "None"
self.add('*{0}*', (repo_name,), "\n")
self.add('{0} -> *{1}*', (_src, _dst), "\n\n")
self.add('*<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/details?region=ap-northeast-2|#{0} {2}>*\n{3}', (pr_id, repo_name, title, description))
_params_dict = {
"pullRequestCreated": "PR 생성",
"CreatePullRequest": "PR 생성",
"GetCommitsFromMergeBase": "PR Merge",
"MergePullRequestByThreeWay": "PR Merge 3way",
"MergePullRequestBySquash": "PR Merge & Squash",
"pullRequestApprovalStateChanged": "승인",
"UpdatePullRequestApprovalState": "승인",
"pullRequestSourceBranchUpdated": "소스브랜치 수정",
"PostCommentForPullRequest": "댓글",
"PostCommentReply": "답글",
"PutCommentReaction": "리액션"
}
data = event['detail']
if 'eventName' in data:
_event_name = data['eventName']
if data['eventName'] not in _params_dict.keys():
#print(f"[PR] eventName {_event_name}")
return None
emj = ":interrobang:"
caller = 'callerUserArn' in data and data['callerUserArn'].split('/')[1]
caller = caller or data['userIdentity']['userName']
elems = data['responseElements'] or data['requestParameters']
# 댓글, 액션
if _event_name in ["PostCommentForPullRequest", "PostCommentReply", "PutCommentReaction"]:
_text = f"[{_params_dict[_event_name]}] {caller}\n"
rp_nm = None
try:
rp_nm = data['resources'][0].get('ARN').split(':')[-1]
except KeyError:
rp_nm = repo_be
_text += str(PR_CMNT(
elems.get('pullRequestId'),
elems.get('reactionValue'),
elems.get('comment'),
elems.get('repositoryName', rp_nm),
elems.get('afterCommitId')
))
# 승인
elif _event_name in ["UpdatePullRequestApprovalState","pullRequestApprovalStateChanged"]:
emj = ":thumbsup_all:"
rp_nm = None
try:
rp_nm = data['resources'][0].get('ARN').split(':')[-1]
except KeyError:
rp_nm = repo_be
_text = f"[{_params_dict[_event_name]}] {caller} - {PR_APPR(elems.get('pullRequestId'), elems.get('repositoryName', rp_nm))} "
#_text += str(PR_APPR(elems.get('pullRequestId')))
# PR 생성& Merge 등
else:
if _event_name in ["GetCommitsFromMergeBase", "MergePullRequestByThreeWay", "MergePullRequestBySquash"]:
emj = ":end:"
color = "#59acb9"
else:
emj = ":on:"
color = "#b4a100"
r_params = data['responseElements']['pullRequest']
r_target = r_params['pullRequestTargets'][0]
_text = f"[{_params_dict[_event_name]}] {caller} - "
_text += str(PR_ELSE(
r_params.get('pullRequestId'),
r_target.get('repositoryName'),
r_params.get('title'),
r_target.get('sourceReference'),
r_target.get('destinationReference'),
r_params.get('description') if "on" in emj else ""
))
e_time = f"*발생시간* - {kst_login_time}\n"
slack_message = {
'channel': PR_CHANNEL,
"username": "CodeCommit",
"icon_emoji": emj or ":interrobang:",
"attachments": [{
"color": color or "#d1d1d1",
'text': _text
}]
}
return send_slack(slack_message)
#RunInstances
def check_run_instance(event):
data = event['detail']
kst_login_time = get_kst_time(data['eventTime'][:19])
try:
# 배포
i_set = data['responseElements'].get('instancesSet')
if i_set is None: raise KeyError
items = i_set['items'][0]
i_type = items['instanceType']
i_ipaddr = items['privateIpAddress']
i_name = ''.join((d['value'] for d in items['tagSet']['items'] if d['key'] == "Name"))
if not i_name or i_name == 'None': raise KeyError
title = f"[Run] - {data['eventName']}"
_text = f"*{title}*\n*발생시간* - {kst_login_time}\n*Run {i_name}*\n{i_type} {i_ipaddr}"
except KeyError:
raise KeyError
slack_message = {
'channel': MONITORING_CHANNEL,
"username": "EC2",
"icon_emoji": ":rocket:",
"attachments": [{
"color": "#59acb9",
'text': _text
}]
}
return send_slack(slack_message)
def check_deploy(event):
data = event['detail']
#kst_login_time = get_kst_time(data['eventTime'][:19])
try:
# 배포
repo_name = data['requestParameters'].get('name')
if repo_name is None:
return check_run_instance(event)
title = f"[Deploy] - {data['eventName']}"
#_text = f"*{title}*\n*발생시간* - {kst_login_time}\n*Deploy {repo_name}*"
_text = f"*{title}*\n*{repo_name}*"
except KeyError:
raise KeyError
slack_message = {
'channel': PR_CHANNEL,
"username": "Pipeline",
"icon_emoji": ":rocket:",
"attachments": [{
"color": "#00b506",
'text': _text
}]
}
return send_slack(slack_message)
def check_user(event):
emj = ""
data = event['detail']
accountType, accountUserName = get_account_info(data)
# KST 시간 변환
kst_login_time = get_kst_time(data['eventTime'][:19])
# Slack Message Title
title = "[%s] %s AWS Console Login" %(accountType, accountUserName)
# sourceIPAddress
sourceIPAddress = data['sourceIPAddress']
_text = "*%s*\n>>>*접속시간*\n%s\n*접속 IPAddress*\n%s\n*Console Login 결과*\n%s\n*MFA 사용유무*\n%s" % (title, kst_login_time, sourceIPAddress, "", "")
try:
# MFA 사용 유무
usedMFA = data['additionalEventData'].get('MFAUsed')
if usedMFA is None:
raise KeyError
# 접속 성공 유무
loginStatusCheck = data['responseElements'].get('ConsoleLogin')
_text = "*%s*\n>>>*접속시간*\n%s\n*접속 IPAddress*\n%s\n*Console Login 결과*\n%s\n*MFA 사용유무*\n%s" % (title, kst_login_time, sourceIPAddress, loginStatusCheck, usedMFA)
except KeyError:
raise KeyError
slack_message = {
'channel': MONITORING_CHANNEL,
"username": "IAM",
"icon_emoji": emj or ":technologist:",
'text': _text
}
return send_slack(slack_message)
def check_s3(event):
emj = ""
data = event['detail']
#kst_login_time = get_kst_time(data['eventTime'][:19])
try:
accountType, accountUserName = get_account_info(data)
title = f"{data['eventName']} - {accountUserName}"
_text = f"*{title}*\n*IP*: {data['sourceIPAddress']}\n*BucketName*: {data['requestParameters']['bucketName']}\n*Agent*: {data['userAgent']}"
except KeyError:
raise KeyError
slack_message = {
'channel': MONITORING_CHANNEL,
"username": "S3",
"icon_emoji": emj or ":takeout_box:",
'text': _text
}
return send_slack(slack_message)
def send_slack(slack_message):
req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
try:
response = urlopen(req)
response.read()
logger.info("Message posted")
except HTTPError as e:
logger.error("Request failed: %d %s", e.code, e.reason)
except URLError as e:
logger.error("Server connection failed: %s", e.reason)
728x90
728x90
BIG
'Programming > AWS' 카테고리의 다른 글
AWS CodeWhisperer 개인 무료 - copilot 대체 (0) | 2023.04.17 |
---|---|
AWS : amazon-personalized (0) | 2021.03.27 |
AWS bucket 에 트리거는 하나만 (0) | 2020.06.02 |
AWS lambda deploy 를 위해서 필요한 정책 (0) | 2020.06.01 |
댓글