이야기박스

GCP; Cloud Pub/Sub 본문

Computer & Data/Cloud Platform

GCP; Cloud Pub/Sub

박스님 2019. 5. 23. 16:07
반응형

  

App Engine에서 Cloud Pub/Sub으로 메시지 전송이 필요하다 보니 기술조사를 시작하게 되었습니다. 카프카와 유사한 점이 많다고 하는데, 제가 카프카를 잘 몰라서 비교를 하지 못했습니다.

다음에 카프카도 공부하여 포스팅할 수 있도록 하겠습니다.

포스팅 내용은 Google 공식 문서를 참조하였습니다.

 

# 개요

Cloud Pub/Sub을 이용하면 전송자와 수신자를 분리하는 다대다 비동기식 메시지 서비스를 제공하기 때문에 독립적으로 작성한 애플리케이션 간의 안전하고 가용성 높은 통신이 가능합니다.  마이크로서비스를 위한 서비스라 생각하면 될 것 같습니다.

동작 방식은 단순합니다. 게시자(Publisher) 애플리케이션이 메시지를 만들어 주제(Topic)로 전달합니다. 구독자(Subscriber) 애플리케이션은 주제에 대한 구독을 만들어 해당 주제로부터 메시지를 수신합니다. 통신은 일대다(fan-in), 다대일(fan-out), 다대다 형태를 취합니다.

## 특징

  • 최소 1회 전송
  • 단 1회 처리
  • 프러비저닝 없는 완전 자동
  • 통합
  • 개방형
  • 글로벌 규모
  • 규정 준수 및 보안

## Pub/Sub 개념 및 메시지 흐름

  1. 게시자 애플리케이션이 Cloud Pub/Sub 서비스에서 주제를 생성하고 메시지를 주제로 전송합니다. 메시지에는 페이로드와 페이로드 콘텐츠를 설명하는 추가 속성이 포함됩니다.
  2. 메시지는 전송 후 구독자가 확인할 때까지 메시지 저장소에 남아 있습니다.
  3. Pub/Sub 서비스가 메시지를 주제에서 개별 구독으로 전송합니다. 각각의 구독은 Pub/Sub가 구독자가 선택한 엔드포인트로 내보내기하거나 구독자가 서비스에서 가져오기한 메시지를 수신합니다.
  4. 구독자는 대기 중인 메시지를 구독으로부터 수신하고, 각각의 메시지를 확인했음을 Pub/Sub 서비스에 알립니다.
  5. 구독자가 메시지를 확인하면 해당 메시지는 구독의 메시지 대기열에서 사라집니다.

## 게시자와 구독자

googleapis.com에 HTTPS를 요청할 수 있는 모든 애플리케이션은 게시자/구독자가 될 수 있습니다. Cloud Pub/Sub이 HTTPS 기반의 REST API를 제공하나 보네요.

 

# 아키텍처

Cloud Pub/Sub은 글로벌 서비스입니다. 클라이언트는 서버나 데이터의 물리적 위치를 알지 못하며, 전 세계 어느 곳에서나 게시나 구독을 전 세계 모든 곳으로 보낼 수 있습니다.

위 정의에서 한 가지 의문점이 생기는데, 게시자와 구독자의 물리적 위치가 다른 경우 응답 지연이 발생하지 않을까? 하는 점입니다. 이 부분에 대한 답은 아래 [제어 영역]이 줄 수 있을 것 같습니다.

Cloud Pub/Sub은 아래 두 가지 주요 영역으로 구분할 수 있습니다. 

## 데이터 영역

게시자가 메시지를 전송하고 이 메시지가 더 이상 서비스에 필요 없게 되는 순간

게시자와 구독자 간의 데이터 이동을 처리하는 영역입니다. 즉 메시지의 라이프사이클이라고 생각하시면 될 것 같습니다.

데이터 영역 내의 서버를 포워더(Forwarder)라고 부릅니다. 이 중 게시자와 연결된 것을 게시 포워더, 구독자와 연결된 부분을 구독 포워더라고 부릅니다.

게시 포워더는 게시된 메시지와 각 구독이 확인한 메시지를 설명하는 메타데이터를 모두 책임집니다. 그리고 특정 주제에 대해 수신하고 저장한 메시지 모음, 확인된 메시지에 대한 추적을 '게시 메시지 원본'이라고 부릅니다. 

구독 포워더는 구독자에게 메시지를 전달해주는 역할을 하게 됩니다. '게시 메시지 원본'이 있는 하나 이상의 게시 포워더에게 자신이 필요한 메시지를 요청하고 이를 구독자에게 전달합니다.

메시지가 구독자에게 전달되면 구독자는 이를 구독 포워더에 알리고, 구독 포워더는 다시 게시 포워더에게, 게시 포워더는 알림을 '게시 메시지 원본'에 저장합니다. 이렇게 주제에 대한 모든 구독이 메시지를 확인하면 메시지는 '게시 메시지 원본'과 저장소에서 비동기적으로 삭제됩니다.

## 제어 영역

게시자와 구독자를 데이터 영역의 포워더에 할당하는 작업, 즉 포워더에게 클라이언트를 배포하는 작업을 합니다. 

제어 영역에 있는 서버를 라우터라고 부르는데, 클라이언트가 Cloud Pub/Sub과 연결되면 라우터는 두 지점 간의 연결에 대한 최단 네트워크 거리를 기준으로 클라이언트를 연결할 데이터 센터를 결정합니다.

이 부분이 아까 위에서 나온 "클라이언트가 서버의 물리적 위치를 모른다"의 답이 될 수 있을 것 같습니다.


# Quotas

Cloud Pub/Sub의 할당량 및 한도에 대한 내용을 다루게 됩니다.

간단하게 설명하면, 사용량 최대치를 사용자가 설정할 수 있습니다. 측정은 "처리건수/minutes"으로 이루어지고 있습니다. 일반 계정으로 할당량 이상 사용 시 RESOURCE_EXHAUSTED 에러가 발생한다고 하네요. 이 문제를 피하려면 서비스 계정을 사용해야 한다고 합니다.

할당량의 한도를 사용자가 조정이 가능했던거에 비해, 리소스의 한도는 조정이 불가능합니다. 중요 한도 내용으로는 다음 내용이 있겠습니다.

  • Publish Request Size : 10MB
  • Unacknowledged messages : 7일 보관
  • 미사용 Subscription은 31일 후 삭제

자세한 내용은 여기를 참고하시면 됩니다.

 

# Pub/Sub on Java

제가 사용한 App Engine이 Flexible환경에서 Spring Boot를 제공하기 때문에 Java로 Pub/Sub을 사용하는 법을 정리하였습니다.

## Publisher

게시자 참고 문서

 

게시자 가이드  |  Cloud Pub/Sub  |  Google Cloud

게시자 애플리케이션이 메시지를 만들어 주제로 전달합니다. 구독자 개요에서 설명하는 것처럼, Cloud Pub/Sub는 기존 구독자에 최소 1회의 메시지 전달과 최선의 순서 지정을 제공합니다. 게시자 애플리케이션의 일반적인 흐름은 다음과 같습니다. 사용자 데이터를 포함하는 메시지를 생성합니다. 메시지를 원하는 주제에 게시하라는 요청을 Cloud Pub/Sub 서버에 전송합니다. 주제 생성 및 관리에 관한 자세한 내용은 주제 및 구독 관리를 참조하세요. 설정

cloud.google.com

메시지는 기본적으로 10MB보다 작아야 합니다. 언어에 따라 비동기/동기 지원 여부가 다른데, 자바의 경우 Future 객체를 사용하는 것을 보니 비동기가 지원이 되네요. 동기 방식으로 사용하고 싶으시다면, Future 객체에 get() 메서드를 사용하시면 됩니다. (이 부분은 카프카와 동일합니다.) 

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(future, new ApiFutureCallback<String>() {

      @Override
      public void onFailure(Throwable throwable) {
        if (throwable instanceof ApiException) {
          ApiException apiException = ((ApiException) throwable);
          // details on the API exception
          System.out.println(apiException.getStatusCode().getCode());
          System.out.println(apiException.isRetryable());
        }
        System.out.println("Error publishing message : " + message);
      }

      @Override
      public void onSuccess(String messageId) {
        // Once published, returns server-assigned message ids (unique within the topic)
        System.out.println(messageId);
      }
    });
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

### BatchingSetting

Cloud Pub/Sub 라이브러리는 일괄 배치 전송이 default로 설정 되어 있습니다. App Engine과 같이 서버리스 이벤트 기반 애플리케이션의 경우, 이 옵션을 꺼두는 게 좋습니다.

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
    .setElementCountThreshold(messageCountBatchSize)
    .setRequestByteThreshold(requestBytesThreshold)
    .setDelayThreshold(publishDelayThreshold)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setBatchingSettings(batchingSettings).build();

### RetrySetting

재시도 요청 세팅도 마찬가지로 원하시는 값으로 작업해주시면 됩니다. 저는 우선 기본 값으로 유지하고 사용해볼 예정이에요.

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(retryDelay)
    .setRetryDelayMultiplier(retryDelayMultiplier)
    .setMaxRetryDelay(maxRetryDelay)
    .setTotalTimeout(totalTimeout)
    .setInitialRpcTimeout(initialRpcTimeout)
    .setMaxRpcTimeout(maxRpcTimeout)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setRetrySettings(retrySettings).build();

### ExecutorThreadCount

동시 작업할 스레드를 조절하는 옵션입니다. 마찬가지로 상황에 맞게 구성하시면 될 것 같습니다.

// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();
Publisher publisher = Publisher.newBuilder(topicName)
    .setExecutorProvider(executorProvider).build();

ExevutorProvider Interface 구현 받은 클래스들은 다음과 같이 있습니다. Java의 Executor와 비슷하네요.

  • AutoValue_InstantiatingExecutorProvider
  • FixedExecutorProvider
  • InstantiatingExecutorProvider

### Publisher Default Setting

Publisher 코드를 들어가보면 아래처럼 기본 설정들이 적용되어 있습니다.

static {
            DEFAULT_TOTAL_TIMEOUT = MIN_TOTAL_TIMEOUT;
            DEFAULT_BATCHING_SETTINGS = BatchingSettings.newBuilder().setDelayThreshold(DEFAULT_DELAY_THRESHOLD).setRequestByteThreshold(1000L).setElementCountThreshold(100L).build();
            DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder().setTotalTimeout(DEFAULT_TOTAL_TIMEOUT).setInitialRetryDelay(Duration.ofMillis(5L)).setRetryDelayMultiplier(2.0D).setMaxRetryDelay(Duration.ofMillis(9223372036854775807L)).setInitialRpcTimeout(DEFAULT_RPC_TIMEOUT).setRpcTimeoutMultiplier(2.0D).setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT).build();
            DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()).build();
        }

## Subscriber

구독자 참고 문서

 

구독자 개요  |  Cloud Pub/Sub  |  Google Cloud

이 문서에서는 Cloud Pub/Sub에서 구독이 작동하는 방식을 개략적으로 설명합니다. 가져오기 및 내보내기 전달 구독에 대한 자세한 내용은 구독자 가져오기 가이드와 구독자 내보내기 가이드를 참조하세요. 주제에 게시된 메시지를 받으려면 해당 주제에 대한 구독을 생성해야 합니다. 구독자 애플리케이션은 구독이 생성된 이후에 주제에 게시된 메시지만 사용할 수 있습니다. 구독은 주제에 게시된 메시지를 수신하고 처리하는 구독자 애플리케이션과 주제를 연결합니다.

cloud.google.com

구독 관련 내용은 굉장히 길기 때문에, 코드는 생략하도록 하겠습니다. 위 참조 문서에 들어가시면 예제들이 많이 있으니 이를 참고하시면 좋을 것 같습니다.

### 구독 특징

Pub/Sub 특징에서 이야기하였듯이, Pub/Sub은 게시된 각 메시지를 모든 구독에 최소 한 번은 전송을 보장합니다. 하지만 이 1회 전송에도 예외는 존재합니다.

  • 최대 유지 기간인 7일 이내에 전달할 수 없는 메시지는 삭제되어 액세스 할 수 없게 됩니다. 이 현상은 대부분 구독자가 메시지 흐름을 따라잡지 못할 때 발생합니다.
  • 구독이 생성되기 전에 게시된 메시지는 대체로 전달되지 않습니다. 따라서 구독 없이 주제에 게시된 메시지는 검색이 불가능합니다

구독자는 ackDeadline을 설정하여 이 이내에 메시지를 확인할 수 있습니다.

구독을 하다보면 재처리된 메시지가 중복으로 들어오는 경우가 생길 수 있습니다. 이때, 구독자는 Cloud Dataflow의 PubsubIO를 이용하면 중복 메시지를 삭제할 수 있다고 하네요. 나중에 Cloud Dataflow도 포스팅을 다시 하도록 하겠습니다.

구독 방법에는 Pull과 Push 두 가지 방법이 존재하는데 아래에서 각각 설명하도록 하겠습니다.

###  Pull

구독자 애플리케이션이 Cloud Pub/Sub 서버에 메시지 검색을 요청하는 방식입니다.

대량 메시지 처리에 유리하기 때문에 메시지 처리 효율성과 처리량이 중요한 애플리케이션에 사용하시면 좋습니다.

사용자 인증이 확실한 서비스에서 이용하기 편한 서비스라고 하네요.

구독자 애플리케이션이 스스로 흐름제어를 진행해야 하고 공유 구독을 사용할 수 있게 구성할 수 있습니다.

 

### Push

Cloud Pub/Sub 서버가 구독자 애플리케이션에 메시지 수신을 요청하는 방식입니다.

App Engine과 같이 여러 주제를 webhook으로 처리하는 경우 유리하다고 합니다.

만약 App Engine이 Cloud Pub/Sub과 같은 GCP Project에 있다면 인증 절차가 필요 없지만, 다른 프로젝트에 있다면 인증 절차가 필요합니다.

기본적으로 Cloud Pub/Sub 서버가 흐름 제어를 제공해줍니다. 

 

# 마무리

Cloud Pub/Sub은 Google Cloud Platform의 여러 서비스를 하나로 이어주는 역할을 하는 서비스입니다. 이 것이 없었다면 많은 서비스가 마이크로서비스로 동작하기 어려웠을 것 같네요. 정말 고마운 서비스입니다.

Publishing 위주의 작업을 하다 보니, 전반적인 Pub/Sub 이해도가 아직도 부족한 편입니다. 다음에 다시 기회가 온다면 보다 깊게 공부하려고 합니다.

반응형

'Computer & Data > Cloud Platform' 카테고리의 다른 글

AWS Glue 실전  (0) 2019.08.13
GCP; App Engine  (0) 2019.05.30
GCP; Network Forwarding Rules  (0) 2019.05.20
GCP; Cloud Functions  (0) 2019.05.04
GCP Deployment, monitoring and alerting, and incident response  (0) 2019.04.05