본문 바로가기
Programming/AWS

AWS 슬랙 알림이란?

by Chan_찬 2023. 1. 19.
728x90

AWS 솔루션들의 변경사항을 슬랙알림으로 받고 싶다.
로그인, codecommit 커밋, deploy, PR, s3, guardduty, waf 에 대한 알림추가하는 방법을 알려준다

AWS의 각 이벤트마다 슬랙으로 알림을 보내고 싶다면, 크게 2가지만 하면 된다.
EventBridge 등록하기 & lambda 함수 만들기, 추가적으로 SNS등록을 해야할 수도 있다


EventBridge 룰 생성

GuardDuty -> CloudWatch Events -> EventBridge -> SNS -> Lambda

EventBridge > rule > event pattern

  • guard duty
{
  "source": ["aws.guardduty"],
  "detail-type": ["GuardDuty Finding"],
  "detail": {
    "severity": [4, 5, 6, 7, 8, 9, 10]
  }
}

CloudWatch -> dashboard -> alert

  • WAF & Shield
<WAF & Shield:본인의 url주소> <waf 대시보드의 차단값 평균이 4를 넘음:대시보드주소>으로 알람이 발생하였습니다.

임계값: 5 분 내 1개의 데이터 포인트에 대한 BlockedRequests > 4
지표 이름: BlockedRequests
WebACL : LB-PW-Prod
Rule : ALL
통계: 평균
기간: 5분

{
  "source": [
    "aws.cloudwatch"
  ],
  "detail-type": [
    "CloudWatch Alarm State Change"
  ],
  "resources": [
    "arn:aws:cloudwatch:ap-northeast-2:585251656769:alarm:wafv2_blocked_over4"
  ]
}
  • 입력경로
{
  "action": "$.detail.service.action",
  "ip1": "$.detail.service.action.networkConnectionAction.remoteIpDetails.ipAddressV4",
  "ip2": "$.detail.service.action.awsApiCallAction.remoteIpDetails.ipAddressV4",
  "name": "$.detail.type"
}
  • 입력 템플릿
"GuardDuty Finding"
"공격유형 : <name>"
"<ip1><ip2> : <action>"

CloudTrail -> CloudWatch Events -> EventBridge -> Lambda

EventBridge > rule > event pattern

  • aws-console-sign-in
{
  "source": ["aws.signin"],
  "detail-type": ["AWS Console Sign In via CloudTrail"]
}
  • aws-creatkeypair
{
  "source": ["aws.ec2"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["ec2.amazonaws.com"]
  }
}
  • codepipeline-deploy
{
  "source": ["aws.codepipeline"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["codepipeline.amazonaws.com"]
  }
}
  • pr_codereview
{
  "source": ["aws.codecommit"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "eventSource": ["codecommit.amazonaws.com"]
  }
}
  • s3
{
  "source": ["aws.s3"]
}

람다 생성

#noti.py

import json
import logging
import os
import time

from datetime import datetime
from datetime import timedelta
from base64 import b64decode
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

HOOK_URL = os.environ['HOOK_URL']
PR_CHANNEL = os.environ['slackChannel']
MONITORING_CHANNEL = os.environ['monitoringChannel']


logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    #logger.info("Event: " + str(event))

    try:
        check_duty(event)
        logger.info("check_duty")
    except:
        try:
            check_user(event)
            logger.info("check_user")
        except:
            try:
                check_s3(event)
                logger.info("check_s3")
            except:
                try:
                    check_deploy(event)
                    logger.info("check_deploy")
                except:
                    check_pullrequest(event)
                    #logger.info("check_pullrequest")


def get_account_info(data):
    accountType = data['userIdentity']['type']

    # Root 인지 IAMUser 인지 구분
    if accountType == "Root":
        accountUserName = "Root"
    elif accountType == "IAMUser":
        accountUserName = data['userIdentity']['userName']
    else:
        accountUserName = " "
    return accountType, accountUserName

def get_kst_time(utc_time):
    return datetime.strptime(utc_time, '%Y-%m-%dT%H:%M:%S') - timedelta(hours=-9) #KST 시간 변환

def check_duty(event):
    title = "GuardDuty"
    msg = event['Records'][0]['Sns']['Message']
    text = f"*{title}*\nDetect something, check NACL\n{msg}"
    if "wafv2_blocked_over4" in msg:
        title = "WAF & Shield"
        text = f"*{title}*\n{msg.split('AlarmDescription')[1].split('AWSAccountId')[0]}"

    msg = {
        "channel": MONITORING_CHANNEL , #"#모니터링",
        "username": title,
        "icon_emoji": ":warning:",
        "attachments": [{
                "text": text,
                "color": "#ba1f00"
            }]
    }
    return send_slack(msg)


def check_pullrequest(event):
    color = None
    repo_be = "PayWatchBackend"
    repo_fe = "Emmaus-Finance"
    link = '<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/activity?region=ap-northeast-2|{1}: {0}>'
    cm_link = '<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/commit/{2}?region=ap-northeast-2|{1} #{0}>'
    kst_login_time = get_kst_time(event['time'][:19])

    class PR_BASE:
        def __init__(self):
            self.string = ""
            self.params = ""
            self.sum_str = ""
        def __str__(self):
            return self.sum_str
        def p(self):
            return str(self.string.format(*self.params))
        def add(self, string, params, nl=""):
            self.string = string
            self.params = params
            self.sum_str += self.p() + nl

    class PR:
        def __init__(self, string, params):
            self.string = string
            self.params = params
        def __str__(self):
            return self.string.format(*self.params)
        def p(self):
            return str(self.string.format(*self.params))

    class PR_CMNT(PR_BASE):
        """댓글, 리액션 이모지."""
        def __init__(self, pr_id, ac_v, comment, repo_name, commit_id="", lc_path="", lc_line=""):
            super().__init__()
            self.add(link, (pr_id, repo_name, commit_id)) if pr_id else ""
            self.add(' {0}', (ac_v,)) if ac_v else ""
            self.add(' {0}', (comment.get('content'),)) if comment else ""

    class PR_APPR(PR_BASE):
        """승인."""
        def __init__(self, pr_id, repo_name):
            super().__init__()
            self.add(cm_link, (pr_id, repo_name)) if pr_id else ""

    class PR_ELSE(PR_BASE):
        """PR생성, Merge."""
        def __init__(self, pr_id, repo_name, title,  src, dst, description=""):
            super().__init__()
            _src = src.split('heads/')[1] if src else "None"
            _dst = dst.split('heads/')[1] if dst else "None"
            self.add('*{0}*', (repo_name,), "\n")
            self.add('{0} -> *{1}*', (_src, _dst), "\n\n")
            self.add('*<https://ap-northeast-2.console.aws.amazon.com/codesuite/codecommit/repositories/{1}/pull-requests/{0}/details?region=ap-northeast-2|#{0} {2}>*\n{3}', (pr_id, repo_name, title, description))



    _params_dict = {
        "pullRequestCreated": "PR 생성",
        "CreatePullRequest": "PR 생성",
        "GetCommitsFromMergeBase": "PR Merge",
        "MergePullRequestByThreeWay": "PR Merge 3way",
        "MergePullRequestBySquash": "PR Merge & Squash",
        "pullRequestApprovalStateChanged": "승인",
        "UpdatePullRequestApprovalState": "승인",
        "pullRequestSourceBranchUpdated": "소스브랜치 수정",
        "PostCommentForPullRequest": "댓글",
        "PostCommentReply": "답글",
        "PutCommentReaction": "리액션"
    }
    data = event['detail']

    if 'eventName' in data:
        _event_name = data['eventName']
        if data['eventName'] not in _params_dict.keys():
            #print(f"[PR] eventName {_event_name}")
            return None

    emj = ":interrobang:"
    caller = 'callerUserArn' in data and data['callerUserArn'].split('/')[1]
    caller = caller or data['userIdentity']['userName']

    elems = data['responseElements'] or data['requestParameters']

    # 댓글, 액션
    if _event_name in ["PostCommentForPullRequest", "PostCommentReply", "PutCommentReaction"]:
        _text = f"[{_params_dict[_event_name]}] {caller}\n"
        rp_nm = None
        try:
            rp_nm = data['resources'][0].get('ARN').split(':')[-1]
        except KeyError:
            rp_nm = repo_be
        _text += str(PR_CMNT(
            elems.get('pullRequestId'),
            elems.get('reactionValue'),
            elems.get('comment'),
            elems.get('repositoryName', rp_nm),
            elems.get('afterCommitId')
        ))

    # 승인
    elif _event_name in ["UpdatePullRequestApprovalState","pullRequestApprovalStateChanged"]:
        emj = ":thumbsup_all:"
        rp_nm = None
        try:
            rp_nm = data['resources'][0].get('ARN').split(':')[-1]
        except KeyError:
            rp_nm = repo_be
        _text = f"[{_params_dict[_event_name]}] {caller} - {PR_APPR(elems.get('pullRequestId'), elems.get('repositoryName', rp_nm))} "
        #_text += str(PR_APPR(elems.get('pullRequestId')))


    # PR 생성& Merge 등
    else:
        if _event_name in ["GetCommitsFromMergeBase", "MergePullRequestByThreeWay", "MergePullRequestBySquash"]:
            emj = ":end:"
            color = "#59acb9"
        else:
            emj = ":on:"
            color = "#b4a100"
        r_params = data['responseElements']['pullRequest']
        r_target = r_params['pullRequestTargets'][0]

        _text = f"[{_params_dict[_event_name]}] {caller}  - "
        _text += str(PR_ELSE(
            r_params.get('pullRequestId'),
            r_target.get('repositoryName'),
            r_params.get('title'),
            r_target.get('sourceReference'),
            r_target.get('destinationReference'),
            r_params.get('description') if "on" in emj else ""
        ))
        e_time = f"*발생시간* - {kst_login_time}\n"

    slack_message = {
        'channel': PR_CHANNEL,
        "username": "CodeCommit",
        "icon_emoji": emj or ":interrobang:",
        "attachments": [{
                "color": color or "#d1d1d1",
                'text': _text
            }]
    }
    return send_slack(slack_message)


#RunInstances
def check_run_instance(event):
    data = event['detail']

    kst_login_time = get_kst_time(data['eventTime'][:19])

    try:
        # 배포
        i_set = data['responseElements'].get('instancesSet')
        if i_set is None: raise KeyError
        items = i_set['items'][0]
        i_type = items['instanceType']
        i_ipaddr = items['privateIpAddress']
        i_name = ''.join((d['value'] for d in items['tagSet']['items'] if d['key'] == "Name"))
        if not i_name or i_name == 'None': raise KeyError

        title = f"[Run] - {data['eventName']}"
        _text = f"*{title}*\n*발생시간* - {kst_login_time}\n*Run {i_name}*\n{i_type} {i_ipaddr}"
    except KeyError:
        raise KeyError

    slack_message = {
        'channel': MONITORING_CHANNEL,
        "username": "EC2",
        "icon_emoji": ":rocket:",
        "attachments": [{
                "color": "#59acb9",
                'text': _text
            }]
    }
    return send_slack(slack_message)


def check_deploy(event):
    data = event['detail']

    #kst_login_time = get_kst_time(data['eventTime'][:19])
    try:
        # 배포
        repo_name = data['requestParameters'].get('name')
        if repo_name is None:
            return check_run_instance(event)
        title = f"[Deploy] - {data['eventName']}"
        #_text = f"*{title}*\n*발생시간* - {kst_login_time}\n*Deploy {repo_name}*"
        _text = f"*{title}*\n*{repo_name}*"
    except KeyError:
        raise KeyError

    slack_message = {
        'channel': PR_CHANNEL,
        "username": "Pipeline",
        "icon_emoji": ":rocket:",
        "attachments": [{
                "color": "#00b506",
                'text': _text
            }]
    }
    return send_slack(slack_message)



def check_user(event):
    emj = ""
    data = event['detail']

    accountType, accountUserName = get_account_info(data)
    # KST 시간 변환
    kst_login_time = get_kst_time(data['eventTime'][:19])

    # Slack Message Title
    title = "[%s] %s AWS Console Login" %(accountType, accountUserName)

    # sourceIPAddress
    sourceIPAddress = data['sourceIPAddress']
    _text = "*%s*\n>>>*접속시간*\n%s\n*접속 IPAddress*\n%s\n*Console Login 결과*\n%s\n*MFA 사용유무*\n%s" % (title, kst_login_time, sourceIPAddress, "", "")
    try:
        # MFA 사용 유무
        usedMFA = data['additionalEventData'].get('MFAUsed')
        if usedMFA is None:
            raise KeyError
        # 접속 성공 유무
        loginStatusCheck = data['responseElements'].get('ConsoleLogin')
        _text = "*%s*\n>>>*접속시간*\n%s\n*접속 IPAddress*\n%s\n*Console Login 결과*\n%s\n*MFA 사용유무*\n%s" % (title, kst_login_time, sourceIPAddress, loginStatusCheck, usedMFA)
    except KeyError:
        raise KeyError

    slack_message = {
        'channel': MONITORING_CHANNEL,
        "username": "IAM",
        "icon_emoji": emj or ":technologist:",
        'text': _text
    }
    return send_slack(slack_message)


def check_s3(event):
    emj = ""
    data = event['detail']
    #kst_login_time = get_kst_time(data['eventTime'][:19])

    try:
        accountType, accountUserName = get_account_info(data)
        title = f"{data['eventName']} - {accountUserName}"
        _text = f"*{title}*\n*IP*: {data['sourceIPAddress']}\n*BucketName*: {data['requestParameters']['bucketName']}\n*Agent*: {data['userAgent']}"
    except KeyError:
        raise KeyError

    slack_message = {
        'channel': MONITORING_CHANNEL,
        "username": "S3",
        "icon_emoji": emj or ":takeout_box:",
        'text': _text
    }
    return send_slack(slack_message)


def send_slack(slack_message):
    req = Request(HOOK_URL, json.dumps(slack_message).encode('utf-8'))
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted")
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)
728x90
728x90
Buy me a coffeeBuy me a coffee

댓글