본문 바로가기

DevOps/Slack & API Gateway

Slack Slash Command를 통해 AWS 서버 스케줄링하기 - 2. Lambda

가장 먼저, Token 값을 인증할 람다함수를 만들겠습니다.

슬랙 앱을 구성하고 맨 처음에 있는 토큰 값을 가지고 인증할 것입니다.

import json
import boto3

def check_token(token):
    token_list = ["Slack Verify Token"]
    if token in token_list:
        return True
    else:
        return False

def lambda_handler(event, context):
    print(event)
    token = event['token']
    client = boto3.client('lambda')
    if check_token(token):
        response = client.invoke_async(
            FunctionName='slack-slash-command-'+event['command'][1:].lower(),
            InvokeArgs=json.dumps(event)
        )
        return {
            # "response_type": "in_channel",
            "text": "Check the Slack Token...\nSuccess Authentication!\nIt takes few seconds to process..."
        }
    else:
        return {
            # "response_type": "in_channel",
            "text": "Check the Slack Token...\nFailed Authentication\nPlease Check the Slash Command App Setting"
        }

 

위와 같이 함수를 구성한 이유를 하나씩 말씀드리겠습니다.

먼저 슬래쉬 커맨드를 수행하게 되면 아래와 같이 람다에 전달됩니다.

 

{
	'token': 'Slack Verify Token', 
    'team_id': '@@@@', 
    'team_domain': '@@@@@', 
    'channel_id': '@@@@', 
    'channel_name': 'jenkins', 
    'user_id': '@@@@@', 
    'user_name': 'hyunho129', 
    'command': '/ag', 
    'text': 'describe', 
    'response_url': 'https://hooks.slack.com/commands/@@@@/@@@@', 
    'trigger_id': '922736629300.545631242689.1d72438f7c9d9c1e737c0ca8fc62db38'
}

 

따라서 토큰값을 가지고 인증을 해서 슬랙에서 온 올바른 요소이면 함수를 수행하는데,

말씀드렸듯이 3000ms 이내에 선 응답을 해줘야하기 때문에 Async로 ec2 혹은 auto scaling group에 대한 Command라면

해당 이름을 맞춰 함수를 수행하게 끔 만들었습니다.

 

그래서 저는 ec2나 autoscaling group을 컨트롤 하는 함수의 이름을 

커맨드는 /ec2, /ag 람다 함수의 이름은 slack-slash-command-ec2, slack-slash-command-ag 으로 구성하였습니다.

 

이제 두 함수를 만들어보겠습니다.

slack-slash-command-75453822-7715-4fb1-9229-26c4742392a8.zip
0.01MB

위의 패키지 파일을 받아서 Layer를 구성해주세요.

 

저는 한 슬랙 채널에서 여러 계정을 컨트롤 하게 구성하기 때문에 (한 사이트의 멀티어카운트 구성이 많음)

아래와 같이 ssm에 토큰값을 가지고 access key, secret key를 ssm parameter store에 저장하여 사용합니다.

import json
import boto3
import slackweb

def slack_send(url, message):
    slack = slackweb.Slack(url=url)
    # slack.notify(response_type="in_channel", text=message)
    slack.notify(text=message)

def get_client(token):
    ssm = boto3.client('ssm')
    response = ssm.get_parameter(Name=token, WithDecryption=True)
    key = response['Parameter']['Value']
    key = key.split(',')
    return boto3.client(
        'ec2',
        aws_access_key_id=key[0],
        aws_secret_access_key=key[1]
    )

def status(client, tag_key, value):
    info = ""
    ec2_list = client.describe_instances()
    for ec2 in ec2_list['Reservations']:
        name = ""
        resource_env = ""
        ec2_info = ec2['Instances'][0]
        try:
            for tag in ec2_info['Tags']:
                if tag['Key'] == "Name":
                    name = tag['Value']
                elif tag['Key'] == 'Env':
                    resource_env = tag['Value']
            if tag_key == "all":
                info += name + "\t" + ec2_info['State']['Name'] + "\t" + ec2_info['PrivateIpAddress'] + "\t" + ec2_info['InstanceType'] + "\t" + resource_env +"\n"
            else:
                for tag in ec2_info['Tags']:
                    if tag_key.lower() == tag['Key'].lower():
                        if value.lower() in tag['Value'].lower():
                            info += name + "\t" + ec2_info['State']['Name'] + "\t" + ec2_info['PrivateIpAddress'] + "\t" + ec2_info['InstanceType'] + "\t" + resource_env +"\n"
        except :
            continue
    print(info)
    return info

def stop(client, tag_key, value):
    response = client.describe_instances()
    ec2_list = []
    for ec2 in response['Reservations']:
        for instance in ec2['Instances']:
            if tag_key == "all":
                ec2_list.append(instance['InstanceId'])
            else:
                try:
                    for tags in instance['Tags']:
                        if tags['Key'].upper() == tag_key.upper():
                            if value.upper() in tags['Value'].upper():
                                print(tags['Value'] + ':' + instance['InstanceId'])
                                ec2_list.append(instance['InstanceId'])
                except :
                    continue
    stop_response = client.stop_instances(InstanceIds=ec2_list)
    stop_instances = stop_response['StoppingInstances']
    result = ""
    for stop_instance in stop_instances:
        result += stop_instance['InstanceId'] + " " + stop_instance['CurrentState']['Name'] + "\n"
    return result

def start(client, tag_key, value):
    response = client.describe_instances()
    ec2_list = []
    for ec2 in response['Reservations']:
        for instance in ec2['Instances']:
            if tag_key == "all":
                ec2_list.append(instance['InstanceId'])
            else:
                try:
                    for tags in instance['Tags']:
                        if tags['Key'].upper() == tag_key.upper():
                            if value.upper() in tags['Value'].upper():
                                print(tags['Value'] + ':' + instance['InstanceId'])
                                ec2_list.append(instance['InstanceId'])
                except:
                    continue
    start_response = client.start_instances(InstanceIds=ec2_list)
    start_instances = start_response['StartingInstances']
    result = ""
    for start_instance in start_instances:
        result += start_instance['InstanceId'] + " " + start_instance['CurrentState']['Name'] + "\n"
    return result
    
def help():
    message = "***EC2 Commnand 사용법***\n"\
    "명령어 문법은 다음과 같습니다 -> '/ec2 command tag-key tag-value'\n"\
    "ex1) '/ec2 status env dev' -> Env 태그 값에 dev가 포함된 EC2 List를 가져옵니다.\n"\
    "ex2) '/ec2 start name phh' -> Name 태그 값에 phh가 포함된 EC2 서버들을 구동시킵니다.\n"\
    "ex3) '/ec2 status(or start) all -> 모든 EC2 List를 가져옵니다. (or 구동시킵니다.)\n"\
    "status tag value : 해당 tag의 값에 value값이 포함된 EC2 List를 가져옵니다.\n"\
    "start tag value : 해당 tag의 값에 value값이 포함된 EC2 서버들을 구동시킵니다.\n"\
    "stop tag value : 해당 tag의 값에 value값이 포함된 EC2 서버들을 정지시킵니다."
    print(message)
    return message

def do_act(token, act):
    client = get_client(token)
    function = act[0]
    if len(act) >= 2:
        tag_key = act[1]
        if tag_key == "all":
            value = "all"
        elif tag_key == "help":
            value = ""
        else:
            value = act[2]
    if function == "status":
        return status(client, tag_key, value)
    elif function == "start":
        return start(client, tag_key, value)
    elif function == "stop":
        return stop(client, tag_key, value)
    elif function == "help":
        return help()
    else:
        return "Please Check the function"

def lambda_handler(event, context):
    act = event['text'].split(' ')
    print(act)
    result = do_act(event['token'], act)
    slack_send(event['response_url'], result)

 

다음은 AutoScaling Group을 컨트롤하는 함수입니다.

 

import json
import boto3
import slackweb

def slack_send(url, message):
    slack = slackweb.Slack(url=url)
    # slack.notify(response_type="in_channel", text=message)
    slack.notify(text=message)

def get_client(service_name, token):
    ssm = boto3.client('ssm', region_name="ap-northeast-1")
    response = ssm.get_parameter(Name=token, WithDecryption=True)
    key = response['Parameter']['Value']
    key = key.split(',')
    return boto3.client(
        service_name,
        aws_access_key_id=key[0],
        aws_secret_access_key=key[1],
        region_name="ap-northeast-1"
    )

def ec2_describe(token, ec2_id_list):
    info = ""
    client = get_client('ec2', token)
    ec2_list = client.describe_instances(InstanceIds=ec2_id_list)
    for ec2 in ec2_list['Reservations']:
        name = ""
        resource_env = ""
        ec2_info = ec2['Instances'][0]
        try:
            for tag in ec2_info['Tags']:
                if tag['Key'] == "Name":
                    name = tag['Value']
                elif tag['Key'] == 'Env':
                    resource_env = tag['Value']
            info += name + "\t" + ec2_info['State']['Name'] + "\t" + ec2_info['PrivateIpAddress'] + "\t" + ec2_info['InstanceType'] + "\t" + resource_env +"\n"
        except :
            continue
    print("EC2 List\n" + info)
    return info

def describe(client, token):
    info = ""
    ag_list = client.describe_auto_scaling_groups()
    for ag in ag_list['AutoScalingGroups']:
        ag_name = ag['AutoScalingGroupName']
        ag_des_capacity = ag['DesiredCapacity']
        ag_min_capacity = ag['MinSize']
        ag_max_capacity = ag['MaxSize']
        info += ag_name + "\t" + str(ag_des_capacity) + "\t" + str(ag_min_capacity) + "\t" + str(ag_max_capacity) + "\n"
        ec2_list = ag['Instances']
        try:
            ec2_id_list=[]
            for ec2 in ec2_list:
                ec2_id_list.append(ec2['InstanceId'])
            info += ec2_describe(token, ec2_id_list)
        except :
            continue
    print(info)
    return info

def update(client, name, desire_capacity, min_capacity, max_capacity):
    response = client.update_auto_scaling_group(
        AutoScalingGroupName=name,
        MinSize=int(min_capacity),
        MaxSize=int(max_capacity),
        DesiredCapacity=int(desire_capacity)
    )
    print(response)
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return "Success Update Autoscaling Group"
    else:
        return "Failed Update Autoscaling Group"
    
def help():
    message = "*** AG Commnand 사용법 ***\n"\
    "명령어 문법은 다음과 같습니다 -> '/ag command params'\n"\
    "ex1) '/ag describe' -> Auto Scailing Group을 나열합니다.\n"\
    "ex2) '/ag update eks-f6b7904b-daee-949a-2805-e807f1c1bac4 2 2 6' -> Auto Scailing Group의 용량을 목표2 최소2 최대6 수정\n"\
    "describe : 모든 Auto Scailing Group과 포함된 EC2 List를 가져옵니다.\n"\
    "update name desire_capacity min_capacity max_capacity : 오토스케일링 그룹을 업데이트 합니다."
    print(message)
    return message

def do_act(token, act):
    client = get_client('autoscaling', token)
    function = act[0]
    if function == "describe":
        return describe(client, token)
    elif function == "update":
        return update(client, act[1], act[2], act[3], act[4])
    elif function == "help":
        return help()
    else:
        return "Please Check the function"

def lambda_handler(event, context):
    act = event['text'].split(' ')
    print(act)
    result = do_act(event['token'], act)
    slack_send(event['response_url'], result)
    
    

 

이 함수들을 모두 만들었으면 다음 포스터에서 API Gateway를 구성하고 Slack Slash Command 구성을 끝내보겠습니다.