안녕하세요. 저는 브랜디 R&D 본부 개발1팀의 기둥을 담당하는 이상근입니다. 오늘은 SQS(Simple Queue Service)와 Lambda를 간단한 예제와 함께 정리해보려고 합니다. 각 서비스에 대한 설명은 이미 매뉴얼로 쉽게 정리되어 있으므로, 이번 글에서는 서비스 간 구성을 집중적으로 살펴보겠습니다.1)
SQS(Simple Queue Service)는 마이크로 서비스와 분산 시스템, 그리고 서버리스 애플리케이션을 쉽게 분리하고 확장할 수 있는 ‘완전관리형 메시지 대기열 서비스’입니다. 그리고 Lambda는 ‘이벤트 처리 방식의 서버리스 컴퓨팅 서비스’입니다. 아래 그림은 SQS와 Lambda Function을 이용해 메시지를 등록-조회-처리하는데 필요한 구성요소를 정리한 것입니다.
이번에는 큐 생성에 대해 살펴보겠습니다. ‘Create New Queue’를 클릭했을 때 지역(Region)에 따라 각각 다른 화면이 노출됩니다.
두 번째 이미지와 같이 SQS에서는 Standard, FIFO 두 가지 타입을 제공하고 있습니다. 표준 대기열은 순서에 맞지 않게 메시지가 전송될 수 있습니다. 만약 순서를 반드시 유지해야 한다면 FIFO 대기열을 사용하거나, 순서 정보를 추가하고 사용해야 합니다.
하지만 FIFO 대기열의 경우 현재 미국 동부(버지니아 북부), 미국 동부(오하이오), 미국 서부(오레곤) 및 EU(아일랜드) 지역(Region)이서만 제공되고 있기 때문에 다른 곳에서는 사용할 수 없습니다. 2) 3)
1.Create New Queue
‘Create New Queue’에는 여러 항목이 있습니다. 우선 아래를 참조하여 각 항목에 적절한 내용을 기재합니다.
2.큐 등록 확인
기본 값으로 설정한 큐 등록을 확인합니다.
3.SQS 메시지 등록
import boto3, json
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx' )
SQS 메시지 등록
response = sqs_client.send_message(
QueueUrl='https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1',
MessageBody='메시지 내용' )
print(json.dumps(response))
{"MD5OfMessageBody": "xxxxxxx", "MessageId": "xxxxx-xxxx-xxxxxx", "ResponseMetadata": {"RequestId": "xxxxxxx", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:01:13 GMT", "content-type": "text/xml", "content-length": "378", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}
4.AWS Console 메시지 등록 확인
5.조회와 실행
1)SQS 메시지를 조회합니다.
2)LambdaWorker 함수를 실행하고 > InvocationType으로 동기, 비동기 등의 실행 유형을 설정합니다.
import boto3, json
def handle(event, context):
queue_url = 'https://sqs.xxxxxx.amazonaws.com/xxxxxx/sqs-test-1'
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx'
)
lambda_client = boto3.client(
service_name='lambda',
region_name='ap-northeast-1'
)
# SQS 메시지 조회
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
AttributeNames=[
'All'
]
)
print(json.dumps(response))
# {"Messages": [{"MessageId": "xxxxx-xxxx-xxxxxx", "ReceiptHandle": "xxxxx-xxxx-xxxxxx", "MD5OfBody": "xxxxxxx", "Body": "\uba54\uc2dc\uc9c0 \ub0b4\uc6a9", "Attributes": {"SenderId": "xxxxxxx", "ApproximateFirstReceiveTimestamp": "1518163931724", "ApproximateReceiveCount": "1", "SentTimestamp": "1518163466941"}}], "ResponseMetadata": {"RequestId": "", "HTTPStatusCode": 200, "HTTPHeaders": {"server": "Server", "date": "Fri, 09 Feb 2018 08:12:11 GMT", "content-type": "text/xml", "content-length": "1195", "connection": "keep-alive", "x-amzn-requestid": "xxxxxxx"}, "RetryAttempts": 0}}
for message in response['Messages']:
payload = {'message': message, 'queueUrl': queue_url}
# Lambda Worker 함수 실행
lambda_client.invoke(
FunctionName='lambda_worker',
InvocationType='Event',
Payload=json.dumps(payload)
)
6.Lambda Consumer 함수 등록
Execution role : SQS ReceiveMessage, Lambda InvokeFunction, CloudWatchLogs
7.확인-실행-삭제
1) 이벤트로 넘어온 메시지 내용을 확인하고
2) 메시지 프로세스를 실행한 후
3) SQS 메시지를 삭제합니다.
import boto3, json
def handle(event, context):
sqs_client = boto3.client(
service_name='sqs',
region_name='xxxxxx'
)
message_body = json.loads(event['message']['Body'])
queue_url = event['queueUrl']
receipt_handle = event['message']['ReceiptHandle']
###############
# 큐 메시지 처리
###############
# SQS 메시지 삭제
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
8.Lambda Worker 함수 등록
Execution role : SQS DeleteMessage, CloudWatchLogs
9.CloudWatch의 Event Rule 등록
10.1분에 한 번씩 지정한 Lambda 함수를 실행하여 SQS 메시지 확인
참고)이것만은 꼭 알아두세요!
여러 대의 서버에 메시지 사본을 저장하기 때문에 가끔씩 메시지 사본을 받거나 삭제하는 중엔 저장 서버 중 하나를 사용할 수 없을 수도 있다고 합니다. 이 경우, 해당 문제가 발생하면 사용할 수 없는 서버의 메시지가 삭제되지 않아, 메시지를 다시 가져와야 하는 문제가 생길 수 있습니다. 그러므로 애플리케이션에서 동일 메시지를 두 번 이상 처리하는 것도 대비해야 합니다.
지금까지 AWS 환경에서 SQS, Lambda, CloudWatch EventRule을 이용한 메시지 대기열 등록과 처리에 대한 기본 예제들을 실행해봤습니다. AWS의 다른 서비스들과 같이 아주 간단한 방법으로 메시지 대기열을 이용할 수 있었습니다. 오늘 살펴본 방법들을 활용하면 동영상 트랜스 코딩 등의 작업을 비롯해 분산 애플리케이션 간의 데이터 처리에도 유용하게 사용할 수 있을 겁니다.
ps.아마존 형님들의 IT 인프라를 이용하여 편하게 개발에만 집중합시다.
참고
1) 각 서비스 매뉴얼은 아래 페이지 링크 참조하면 된다.
2)2018년 2월 기준이다.
3)표준 대기열과 FIFO 대기열의 특징은 아래와 같으며 자세한 내용은 매뉴얼에 정리되어 있다.
글
이상근 팀장 | R&D 개발1팀
leesg@brandi.co.kr
브랜디, 오직 예쁜 옷만
#브랜디 #개발문화 #개발팀 #업무환경 #인사이트 #경험공유
관련 스택