본문 바로가기

Python/Boto3

AWS Lambda를 통해 현재 Security Group List를 SES 로 받아보기

앞선 EC2 List 받아보기와 구성은 완전 똑같습니다.

 

위의 글을 참고해주세요

 

람다 소스코드만 바꾸면 되는 부분이기 때문에 소스코드와 이에 대한 설명만 하도록 하겠습니다.

 

import os
import boto3
import smtplib
import datetime
import pandas as pd
from openpyxl import load_workbook
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill, Color

def deco_cell(ws, cell, bold, horizon, color):
    box = Border(left=Side(border_style="thin", color='FF000000'),
                 right=Side(border_style="thin", color='FF000000'),
                 top=Side(border_style="thin", color='FF000000'),
                 bottom=Side(border_style="thin", color='FF000000'),
                 diagonal=Side(border_style="thin", color='FF000000'),
                 diagonal_direction=0, outline=Side(border_style="thin", color='FF000000'),
                 vertical=Side(border_style="thin", color='FF000000'),
                 horizontal=Side(border_style="thin", color='FF000000'))

    ws[cell].font = Font(name="맑은 고딕", size=10, bold=bold)
    ws[cell].alignment = Alignment(horizontal=horizon, vertical="center")
    if color != '':
        ws[cell].fill = PatternFill(patternType='solid', fgColor=Color(color))
    ws[cell].border = box

def deco_excel(rows, save_path):
    wb = load_workbook(filename=save_path, read_only=False, data_only=False)
    ws = wb["List"]

    # ALL
    for each_row in ws.rows:
        for each_row_cell in each_row:
            cell = str(each_row_cell)[6:-1].split('.')[-1]
            deco_cell(ws, cell, False, 'center', '')

    # Header
    for i in range(ord('A'), ord('H')):
        for j in rows:
            ws.merge_cells("B" + str(j + 1) + ":D" + str(j + 1))
            ws.merge_cells("E" + str(j + 1) + ":G" + str(j + 1))
            deco_cell(ws, chr(i) + str(j + 1), True, 'center', 'c6e0b4')
            deco_cell(ws, chr(i) + str(j + 2), True, 'center', 'c6e0b4')

    # ALL Width
    ws.column_dimensions['A'].width = 31
    ws.column_dimensions['B'].width = 8
    ws.column_dimensions['C'].width = 22
    ws.column_dimensions['D'].width = 22
    ws.column_dimensions['E'].width = 8
    ws.column_dimensions['F'].width = 22
    ws.column_dimensions['G'].width = 22
    wb.save(save_path)

def GetKey(name):
    ssm = boto3.client('ssm')
    response = ssm.get_parameter(Name=name, WithDecryption=True)
    key = response['Parameter']['Value']
    return key

def ses(RECIPIENT, save_path, year, month):
    SENDER = "phh129@lotte.net"
    SENDERNAME = 'Security Group List'
    # CC = ["phh129@lotte.net", "hyunho129@nave.com"]
    USERNAME_SMTP = "SES SMTP KEY ID"
    PASSWORD_SMTP = GetKey('SES SMTP KEY ID')

    HOST = "email-smtp.us-east-1.amazonaws.com"
    PORT = 587

    AWS_REGION = "us-east-1"
    SUBJECT = "LAMBDA AWS Security Group List Info"

    ATTACHMENT = save_path

    BODY_TEXT = "LAMBDA AWS Security Group List Info"

    BODY_HTML = """\
    <html>
    <head></head>
    <body>
    <h1> AWS Security Group List </h1>
    <p>멘트멘트멘트.</p><br><br>
    <p>""" + year + "년 " + month + """ 월 Security Group List 레포트 입니다.</p><br><br>
    <p>감사합니다.</p>
    </body>
    </html>
    """

    CHARSET = "UTF-8"
    msg = MIMEMultipart('alternative')
    msg['Subject'] = SUBJECT
    msg['From'] = SENDER
    # msg['CC'] = ", ".join(CC)
    msg['To'] = RECIPIENT

    part1 = MIMEText(BODY_TEXT, 'plain')
    part2 = MIMEText(BODY_HTML, 'html')
    msg.attach(part1)
    msg.attach(part2)
    att = MIMEApplication(open(ATTACHMENT, 'rb').read())
    att.add_header('Content-Disposition', 'attachment', filename=os.path.basename(ATTACHMENT))
    msg.attach(att)
    try:
        server = smtplib.SMTP(HOST, PORT)
        server.ehlo()
        server.starttls()
        server.ehlo()
        server.login(USERNAME_SMTP, PASSWORD_SMTP)
        server.sendmail(SENDER, RECIPIENT, msg.as_string())
        # server.sendmail(SENDER, CC, msg.as_string())
        server.close()
    except Exception as e:
        print("Error: ", e)
    else:
        print("Email sent!")

def append_df_to_excel(filename, df, sheet_name='Sheet1', startrow=None, truncate_sheet=False, **to_excel_kwargs):
    if 'engine' in to_excel_kwargs:
        to_excel_kwargs.pop('engine')

    writer = pd.ExcelWriter(filename, engine='openpyxl')

    try:
        writer.book = load_workbook(filename)
        if startrow is None and sheet_name in writer.book.sheetnames:
            startrow = writer.book[sheet_name].max_row

        if truncate_sheet and sheet_name in writer.book.sheetnames:
            idx = writer.book.sheetnames.index(sheet_name)
            writer.book.remove(writer.book.worksheets[idx])
            writer.book.create_sheet(sheet_name, idx)

        writer.sheets = {ws.title:ws for ws in writer.book.worksheets}
    except FileNotFoundError:
        pass

    if startrow is None:
        startrow = 0

    df.to_excel(writer, sheet_name, startrow=startrow, **to_excel_kwargs)
    writer.save()

def describe_security_groups(save_path, security_group_infomations):
    row = 0
    col = 0
    rows = []
    for security_group_infomation in security_group_infomations:
        rows.append(row)
        name = security_group_infomation['GroupName']
        in_port = []
        in_source = []
        in_description = []
        for info in security_group_infomation['IpPermissions']:
            try:
                port = info['FromPort']
            except:
                port = info['IpProtocol']
                if port == '-1' or port == -1:
                    port = 'all'
            if len(info['IpRanges']) != 0:
                for IpRange in info['IpRanges']:
                    source = IpRange["CidrIp"]
                    try:
                        description = IpRange['Description']
                    except:
                        description = ""
                    in_port.append(port)
                    in_source.append(source)
                    in_description.append(description)
            else:
                for UserIdGroupPair in info['UserIdGroupPairs']:
                    source = UserIdGroupPair['GroupId']
                    try:
                        description = UserIdGroupPair['Description']
                    except:
                        description = ""
                    in_port.append(port)
                    in_source.append(source)
                    in_description.append(description)
        out_port = []
        out_destination = []
        out_description = []
        for info in security_group_infomation['IpPermissionsEgress']:
            try:
                port = info['FromPort']
            except:
                port = info['IpProtocol']
                if port == '-1' or port == -1:
                    port = 'all'
            if len(info['IpRanges']) != 0:
                for IpRange in info['IpRanges']:
                    source = IpRange["CidrIp"]
                    try:
                        description = IpRange['Description']
                    except:
                        description = ""
                    out_port.append(port)
                    out_destination.append(source)
                    out_description.append(description)
            else:
                for UserIdGroupPair in info['UserIdGroupPairs']:
                    source = UserIdGroupPair['GroupId']
                    try:
                        description = UserIdGroupPair['Description']
                    except:
                        description = ""
                    out_port.append(port)
                    out_destination.append(source)
                    out_description.append(description)
        df = pd.DataFrame(["Inbound"])
        append_df_to_excel(save_path, df, sheet_name="List", startrow=row, startcol=col + 1, index=False, header=False)
        df = pd.DataFrame(["Outbound"])
        append_df_to_excel(save_path, df, sheet_name="List", startrow=row, startcol=col + 4, index=False, header=False)
        df = pd.DataFrame({"Name": name}, index=[0])
        append_df_to_excel(save_path, df, sheet_name="List", startrow=row + 1, startcol=col, index=False)
        df = pd.DataFrame({"Port": in_port, "Source": in_source, "Description": in_description})
        append_df_to_excel(save_path, df, sheet_name="List", startrow=row + 1, startcol=col + 1, index=False)
        in_df_len = len(df)
        df = pd.DataFrame({"Port": out_port, "Destination": out_destination, "Description": out_description})
        append_df_to_excel(save_path, df, sheet_name="List", startrow=row + 1, startcol=col + 4, index=False)
        out_df_len = len(df)
        df_len = in_df_len if in_df_len > out_df_len else out_df_len
        row += df_len + 2
    return rows

def lambda_handler(event, context):
    save_path = '/tmp/sg_list.xlsx'
    if os.path.isfile(save_path):
        os.remove(save_path)
    
    now = datetime.datetime.now()
    year = now.strftime('%Y')
    month = now.strftime('%m')
    
    client = boto3.client('ec2', aws_access_key_id=event['access_key_id'], aws_secret_access_key=GetKey(event['access_key_id']))
    response = client.describe_security_groups()
    rows = describe_security_groups(save_path, response['SecurityGroups'])
    deco_excel(rows, save_path)
    ses(event['receiver'], save_path, year, month)

 

이전 소스와의 차이점인 append_df_to_excel 함수에 대해서 설명하도록 하겠습니다.

해당 함수는 구글링을 하여서 찾아낸 함수로 DF을 계속해서 층층이 쌓아서 만들어주는 함수입니다.

Security Group의 경우 In / Out Bound에 따라서 하나의 그룹마다 DF가 만들어지다보니

해당 함수를 사용하여서 하나의 DF를 만들고 엑셀에 쓰는 형태로 구성하였습니다.

 

알고리즘 로직을 짜는 실력이 부족하여 위와 같이 만들었으나,

똑똑하신 분들 께서 부디 Sort까지 한 형태로 깔끔하게 고쳐서 수정본을 알려주시는 날이 오면 좋겠습니다.