상세 컨텐츠

본문 제목

Go의 이벤트 중심 아키텍처(golang)

Dev Type

by ai developer 2024. 4. 16. 21:14

본문

RabbitMQ, 도메인 중심 설계 및 클린 아키텍처 사용

이벤트 중심 아키텍처란 무엇입니까?

EDA(이벤트 중심 아키텍처)는 애플리케이션의 느슨한 결합, 확장성, 독립성 및 복원성을 활성화하기 위해 이벤트의 생성, 감지 및 소비를 강조하는 소프트웨어 디자인 패턴입니다.

이벤트란 무엇인가요?

주황색 포스트로 표시되는 이벤트는 과거 에 발생한 중요한 사실이며 일반적으로 UserCreated, OrderPaid, InvoiceCanceled 등과 같은 명령을 실행한 결과입니다.

명령이란 무엇입니까?

파란색 포스트로 표시되는 명령은 시스템의 작업 또는 의도이며 항상 명령형 모드입니다(예: CreateUser, PlaceOrder, PayOrder, GetProducts 등).

배우란 무엇인가?

노란색 포스트로 표시되는 행위자는 명령을 실행하는 사람입니다(예: 사용자, 고객, 판매자, 구매자, 시스템 등).

어디서부터 시작하나요?

이벤트 모델링 !!!

문제:

내가 제안한 솔루션:

직접 체험해 보세요

폴더 구조(여기서는 Clean Arch 및 DDD 레이어 개념을 사용하고 있습니다)

이벤트 만들기

내부/도메인/이벤트 폴더  이벤트 order_created_event.go를 생성합니다.

package event

type OrderCreatedEvent struct {
 Id         string
 Items      []OrderItem
 TotalPrice float64
 Status     string
}

type OrderItem struct {
 ProductName string
 Quantity    int
 TotalPrice  float64
}

내부/도메인/이벤트 폴더  이벤트 order_paid_event.go 생성

package event

import (
 "time"
)

type OrderPaidEvent struct {
 OrderId     string
 PaidValue   float64
 PaymentDate time.Time
}

엔터티 만들기

먼저 uuid 패키지를 사용하여 ID를 생성하고 셸에서 실행합니다.

go mod init eda-example
go get github.com/google/uuid

내부/도메인/엔티티 폴더  엔터티 order_item_entity.go를 생성합니다.

package entity

type OrderItemEntity struct {
 productName  string
 productPrice float64
 quantity     int
}

func NewOrderItemEntity(productName string, productPrice float64, quantity int) *OrderItemEntity {
 return &OrderItemEntity{
  productName:  productName,
  productPrice: productPrice,
  quantity:     quantity,
 }
}

// getters
func (o *OrderItemEntity) GetProductName() string {
 return o.productName
}

func (o *OrderItemEntity) GetProductPrice() float64 {
 return o.productPrice
}

func (o *OrderItemEntity) GetQuantity() int {
 return o.quantity
}

func (o *OrderItemEntity) GetTotalPrice() float64 {
 return o.productPrice * float64(o.quantity)
}

내부/도메인/엔티티 폴더  엔터티 order_entity.go를 생성합니다.

package entity

import (
 "errors"
 "github.com/google/uuid"
)

const (
 OrderStatusPending = "pending"
 OrderStatusPaid    = "paid"
)

type OrderEntity struct {
 id         string
 status     string
 items      []*OrderItemEntity
 totalPrice float64
 paidValue  float64
}

func NewOrderEntity() (*OrderEntity, error) {
 return &OrderEntity{
  id:     uuid.New().String(),
  status: OrderStatusPending,
 }, nil
}

// to populate with an existing order
func RestoreOrderEntity(id, status string) (*OrderEntity, error) {
 return &OrderEntity{
  id:     id,
  status: status,
 }, nil
}

func (o *OrderEntity) AddItem(item *OrderItemEntity) {
 o.items = append(o.items, item)
 o.totalPrice += item.GetTotalPrice()
}

func (o *OrderEntity) Pay(value float64) error {
 if value < o.totalPrice {
  return errors.New("value is less than the total price")
 }
 o.paidValue = value
 o.status = OrderStatusPaid
 return nil
}

// getters
func (o *OrderEntity) GetItems() []*OrderItemEntity {
 return o.items
}

func (o *OrderEntity) GetTotalPrice() float64 {
 return o.totalPrice
}

func (o *OrderEntity) GetID() string {
 return o.id
}

func (o *OrderEntity) GetStatus() string {
 return o.status
}

게시자 인터페이스 만들기

내부/도메인/대기열 폴더  인터페이스 publisher.go를 생성합니다.

package queue

import "context"

type Publisher interface {
 Publish(ctx context.Context, body interface{}) error
}

DTO 생성

내부/응용 프로그램/dto 폴더  dto create_order_dto.go를 생성합니다.

package dto

type CreateOrderDTO struct {
 Items []Item `json:"items"`
}

type Item struct {
 ProductId string `json:"product_id"`
 Qtd       int    `json:"qtd"`
}

명령(또는 사용 사례) 만들기

내부/애플리케이션/유스케이스 폴더 에 유스케이스 create_order_usecase.go 생성

package usecase

import (
 "context"
 "fmt"
 "eda-example/internal/application/dto"
 "eda-example/internal/domain/entity"
 "eda-example/internal/domain/event"
 "eda-example/internal/domain/queue"
)

type CreateOrderUseCase struct {
 publisher queue.Publisher
}

func NewCreateOrderUseCase(publisher queue.Publisher) *CreateOrderUseCase {
 return &CreateOrderUseCase{
  publisher: publisher,
 }
}

func (u *CreateOrderUseCase) Execute(ctx context.Context, input dto.CreateOrderDTO) error {
 fmt.Println("--- CreateOrderUseCase ---")

 // create order
 order, err := entity.NewOrderEntity()
 if err != nil {
  return err
 }

 for _, item := range input.Items {
  // TODO: find product in the repository database here
  fakeProductName := "Product " + item.ProductId
  fakeProductPrice := 10.50

  // create fake order item
  i := entity.NewOrderItemEntity(fakeProductName, fakeProductPrice, item.Qtd)

  // add items to order
  order.AddItem(i)
 }

 // TODO: save the order in the repository database here

 var eventItems []event.OrderItem
 for _, item := range order.GetItems() {
  eventItems = append(eventItems, event.OrderItem{
   ProductName: item.GetProductName(),
   TotalPrice:  item.GetTotalPrice(),
   Quantity:    item.GetQuantity(),
  })
 }

 // publish event OrderCreatedEvent passing the order data
 err = u.publisher.Publish(ctx, event.OrderCreatedEvent{
  Id:         order.GetID(),
  TotalPrice: order.GetTotalPrice(),
  Status:     order.GetStatus(),
  Items:      eventItems,
 })
 if err != nil {
  return err
 }
 return nil
}

컨트롤러 생성

내부/응용 프로그램/컨트롤러 폴더  컨트롤러 order_controller.go를 생성합니다.

package controller

import (
 "eda-example/internal/application/dto"
 "eda-example/internal/application/usecase"
 "encoding/json"
 "net/http"
)

type OrderController struct {
 createOrderUserCase        *usecase.CreateOrderUseCase
}

func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase) *OrderController {
 return &OrderController{
  createOrderUserCase:        createOrderUserCase,
 }
}

func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
 var requestData dto.CreateOrderDTO
 json.NewDecoder(r.Body).Decode(&requestData)
 err := u.createOrderUserCase.Execute(r.Context(), requestData)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 w.WriteHeader(http.StatusCreated)
}

MemoryQueueAdapter 만들기

내부/ 인프라 /queue 폴더  memory_queue_adapter.go 생성

package queue

import (
 "context"
 "encoding/json"
 "log"
 "reflect"
)

type MemoryQueueAdapter struct {
}

func NewMemoryQueueAdapter() *MemoryQueueAdapter {
 return &MemoryQueueAdapter{}
}

func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
 eventType := reflect.TypeOf(eventPayload)
 payloadJson, _ := json.Marshal(eventPayload)
 log.Printf("** [Publish] %s: %v ---", eventType, payloadJson)
 return nil
}

애플리케이션 진입점/시작 생성

cmd/api 폴더  main.go 생성

package main

import (
 "eda-example/internal/application/controller"
 "eda-example/internal/application/usecase"
 "eda-example/internal/infra/queue"
 "fmt"
 "net/http"
)

func main() {
 // initialize dependencies and implementations
 queue := queue.NewMemoryQueueAdapter()
 createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
 orderController := controller.NewOrderController(createOrderUseCase)

 // register routes
 http.HandleFunc("POST /create-order", orderController.CreateOrder)

 // start server
 fmt.Println("Server is running on port 8080")
 http.ListenAndServe(":8080", nil)
}

애플리케이션 실행

/create-order 엔드포인트 테스트

우편 배달부에서 요청 보내기:

게시된 로깅 이벤트를 확인하세요.

지금까지의 요약

주문을 생성하기 위해 엔드포인트를 생성합니다. 주문을 생성한 직후에는 주문이 생성되었음을 알리는 이벤트가 게시됩니다.

 

이제 이 이벤트를 수신 하고 여기에서 다른 명령(ProcessOrderPayment, StockMovement 및 SendOrderEmail)을 실행 해야 합니다.

리스너 명령 만들기

내부/애플리케이션/유즈케이스 폴더  process_order_pay_usecase.go 생성

package usecase

import (
 "context"
 "fmt"
 "time"
 "eda-example/internal/domain/entity"
 "eda-example/internal/domain/event"
 "eda-example/internal/domain/queue"
)

type ProcessOrderPaymentUseCase struct {
 publisher queue.Publisher
}

func NewProcessOrderPaymentUseCase(publisher queue.Publisher) *ProcessOrderPaymentUseCase {
 return &ProcessOrderPaymentUseCase{
  publisher: publisher,
 }
}

func (h *ProcessOrderPaymentUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
 fmt.Println("--- ProcessOrderPaymentUseCase ---")

 // TODO: find order by id in the repository database here
 order, err := entity.RestoreOrderEntity(payload.Id, payload.Status)
 if err != nil {
  return err
 }

 for _, i := range payload.Items {
  item := entity.NewOrderItemEntity(i.ProductName, i.TotalPrice/float64(i.Quantity), i.Quantity)
  order.AddItem(item)
 }

 paymentValue := payload.TotalPrice
 err = order.Pay(paymentValue)
 if err != nil {
  return err
 }

 fmt.Printf("Order Paid. Value: %f \n", payload.TotalPrice)
 err = h.publisher.Publish(ctx, event.OrderPaidEvent{OrderId: payload.Id, PaidValue: paymentValue, PaymentDate: time.Now()})
 if err != nil {
  return err
 }
 return nil
}

 

내부/애플리케이션/유스케이스 폴더  stock_movement_usecase.go 생성

package usecase

import (
 "context"
 "eda-example/internal/domain/event"
 "fmt"
)

type StockMovementUseCase struct {
}

func NewStockMovementUseCase() *StockMovementUseCase {
 return &StockMovementUseCase{}
}

func (h *StockMovementUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
 fmt.Println("--- StockMovementUseCase ---")
 for _, item := range payload.Items {
  fmt.Printf("Removing %d items of product %s from stock\n", item.Quantity, item.ProductName)
 }
 return nil
}

 

내부/애플리케이션/유즈케이스 폴더  send_orderemail_usecase.go 생성

package usecase

import (
 "context"
 "eda-example/internal/domain/event"
 "fmt"
)

type SendOrderEmailUseCase struct {
}

func NewSendOrderEmailUseCase() *SendOrderEmailUseCase {
 return &SendOrderEmailUseCase{}
}

func (h *SendOrderEmailUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
 fmt.Println("--- SendOrderEmailUseCase ---")
 fmt.Printf("--- MAIL Order Created: R$ %f \n", payload.TotalPrice)
 return nil
}

OrderController에서 리스너 핸들러 생성

내부/응용 프로그램/컨트롤러 폴더 의 order_controller.go  ProcessOrderPayment , StockMovement  SendOrderEmail 메서드를 만듭니다 .

package controller

import (
 "encoding/json"
 "net/http"
 "eda-example/internal/application/dto"
 "eda-example/internal/application/usecase"
 "eda-example/internal/domain/event"
)

type OrderController struct {
 createOrderUserCase        *usecase.CreateOrderUseCase
 processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase
 stockMovementUseCase       *usecase.StockMovementUseCase
 sendOrderEmailUseCase      *usecase.SendOrderEmailUseCase
}

func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase,
 processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase,
 stockMovementUseCase *usecase.StockMovementUseCase,
 sendOrderEmailUseCase *usecase.SendOrderEmailUseCase) *OrderController {
 return &OrderController{
  createOrderUserCase:        createOrderUserCase,
  processOrderPaymentUseCase: processOrderPaymentUseCase,
  stockMovementUseCase:       stockMovementUseCase,
  sendOrderEmailUseCase:      sendOrderEmailUseCase,
 }
}

func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
 var requestData dto.CreateOrderDTO
 json.NewDecoder(r.Body).Decode(&requestData)
 err := u.createOrderUserCase.Execute(r.Context(), requestData)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) ProcessOrderPayment(w http.ResponseWriter, r *http.Request) {
 var body event.OrderCreatedEvent
 json.NewDecoder(r.Body).Decode(&body)
 err := u.processOrderPaymentUseCase.Execute(r.Context(), &body)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) StockMovement(w http.ResponseWriter, r *http.Request) {
 var body event.OrderCreatedEvent
 json.NewDecoder(r.Body).Decode(&body)
 err := u.stockMovementUseCase.Execute(r.Context(), &body)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 w.WriteHeader(http.StatusCreated)
}

func (u *OrderController) SendOrderEmail(w http.ResponseWriter, r *http.Request) {
 var body event.OrderCreatedEvent
 json.NewDecoder(r.Body).Decode(&body)
 err := u.sendOrderEmailUseCase.Execute(r.Context(), &body)
 if err != nil {
  w.WriteHeader(http.StatusInternalServerError)
  w.Write([]byte(err.Error()))
  return
 }
 w.WriteHeader(http.StatusCreated)
}

리스너 구조체 생성

내부/ 인프라 /queue 폴더  listener.go 생성

package queue

import (
 "net/http"
 "reflect"
)

type Listener struct {
 eventType reflect.Type
 callback  func(w http.ResponseWriter, r *http.Request)
}

QueueResponseWriter 구조체 생성

내부/ 인프라 /queue 폴더  response_writer.go 생성

package queue

import (
 "fmt"
 "net/http"
)

type QueueResponseWriter struct {
 body       []byte
 statusCode int
 header     http.Header
}

func NewQueueResponseWriter() *QueueResponseWriter {
 return &QueueResponseWriter{
  header: http.Header{},
 }
}

func (w *QueueResponseWriter) Header() http.Header {
 return w.header
}

func (w *QueueResponseWriter) Write(b []byte) (int, error) {
 w.body = b
 return 0, nil
}

func (w *QueueResponseWriter) WriteHeader(statusCode int) {
 w.statusCode = statusCode
}

var okFn = func(w http.ResponseWriter, r *http.Request) {
 w.WriteHeader(http.StatusOK)
}

func main() {
 r := &http.Request{
  Method: http.MethodPost,
 }
 w := NewQueueResponseWriter()
 okFn(w, r)
 fmt.Println(w.statusCode)
}

대기열 인터페이스 생성

내부/도메인/queue 폴더  queue.go 생성

package queue

import (
 "context"
 "net/http"
 "reflect"
)

type Queue interface {
 ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request))
 Connect(ctx context.Context) error
 Disconnect(ctx context.Context) error
 Publish(ctx context.Context, body interface{}) error
 StartConsuming(ctx context.Context, queueName string) error
}

MemoryQueueAdapter에서 메서드 구현

내부/ 인프라 /queue 폴더 의 memory_queue_adapter.go 에 메소드 구현

package queue

import (
 "bytes"
 "context"
 "encoding/json"
 "log"
 "net/http"
 "reflect"
)

type MemoryQueueAdapter struct {
 listeners map[string][]Listener
}

func NewMemoryQueueAdapter() *MemoryQueueAdapter {
 return &MemoryQueueAdapter{
  listeners: make(map[string][]Listener),
 }
}

func (eb *MemoryQueueAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
 eb.listeners[eventType.Name()] = append(eb.listeners[eventType.Name()], Listener{eventType, handler})
}

func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
 eventType := reflect.TypeOf(eventPayload)
 payloadJson, _ := json.Marshal(eventPayload)

 log.Printf("--- Publish %s ---", eventType)

 for _, listener := range eb.listeners[eventType.Name()] {
  w := NewQueueResponseWriter()
  body := bytes.NewBuffer(payloadJson)
  r, err := http.NewRequestWithContext(ctx, http.MethodPost, eventType.Name(), body)
  if err != nil {
   return err
  }

  listener.callback(w, r)
  if err != nil {
   return err
  }
 }

 return nil
}

func (eb *MemoryQueueAdapter) Connect(ctx context.Context) error {
 log.Println("--- MemoryQueueAdapter connected ---")
 return nil
}

func (eb *MemoryQueueAdapter) Disconnect(ctx context.Context) error {
 log.Println("--- MemoryQueueAdapter disconnected ---")
 return nil
}

func (eb *MemoryQueueAdapter) StartConsuming(ctx context.Context, queueName string) error {
 log.Printf("--- MemoryQueueAdapter StartConsuming queue %s ---", queueName)
 return nil
}

리스너 핸들러를 사용하여 이벤트 바인딩

 

cmd/api 폴더 의 main.go 파일 에 리스너를 등록합니다.

package main

import (
 "context"
 "eda-example/internal/application/controller"
 "eda-example/internal/application/usecase"
 "eda-example/internal/domain/event"
 "eda-example/internal/infra/queue"
 "fmt"
 "log"
 "net/http"
 "reflect"
)

func main() {
 ctx := context.Background()

 // initialize queue
 queue := queue.NewMemoryQueueAdapter()

 // use cases
 createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
 processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
 stockMovementUseCase := usecase.NewStockMovementUseCase()
 sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()

 // controllers
 orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)

 // register routes
 http.HandleFunc("POST /create-order", orderController.CreateOrder)

 // mapping listeners
 var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
  reflect.TypeOf(event.OrderCreatedEvent{}): {
   orderController.ProcessOrderPayment,
   orderController.StockMovement,
   orderController.SendOrderEmail,
  },
 }

 // register listeners
 for eventType, handlers := range list {
  for _, handler := range handlers {
   queue.ListenerRegister(eventType, handler)
  }
 }

 // connect queue
 err := queue.Connect(ctx)
 if err != nil {
  log.Fatalf("Error connect queue %s", err)
 }
 defer queue.Disconnect(ctx)

 // start consuming queues
 OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()

 go func(ctx context.Context, queueName string) {
  err = queue.StartConsuming(ctx, queueName)
  if err != nil {
   log.Fatalf("Error running consumer %s: %s", queueName, err)
  }
 }(ctx, OrderCreatedEvent)

 // start server
 fmt.Println("Server is running on port 8080")
 http.ListenAndServe(":8080", nil)
}

이벤트 소비

애플리케이션을 다시 시작하세요.

 

엔드포인트 POST /create-order를 다시 호출하고 콘솔 로그를 확인하세요.

추가(RabbitMQ로 구현)

Rabbitmq lib를 설치합니다:

go get github.com/rabbitmq/amqp091-go

 

내부/ 인프라 /queue 폴더  Rabbitmq_adapter.go를 생성합니다 .

package queue

import (
 "bytes"
 "context"
 "encoding/json"
 "errors"
 "log"
 "net/http"
 "reflect"
 "time"

 amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQAdapter struct {
 uri       string
 conn      *amqp.Connection
 listeners map[string][]Listener
}

type QueueMessage struct {
 Body []byte
}

func NewRabbitMQAdapter(uri string) *RabbitMQAdapter {
 return &RabbitMQAdapter{
  uri:       uri,
  listeners: make(map[string][]Listener),
 }
}

func (r *RabbitMQAdapter) Connect(ctx context.Context) error {
 conn, err := amqp.Dial(r.uri)
 if err != nil {
  return err
 }
 r.conn = conn
 return nil
}

func (r *RabbitMQAdapter) Disconnect(ctx context.Context) error {
 return r.conn.Close()
}

func (r *RabbitMQAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
 eventName := reflect.TypeOf(eventPayload).Name()

 ch, err := r.conn.Channel()
 if err != nil {
  return err
 }
 defer ch.Close()

 q, err := ch.QueueDeclare(
  eventName, // queue name
  true,      // durable
  false,     // delete when unused
  false,     // exclusive
  false,     // no-wait
  nil,       // arguments
 )
 if err != nil {
  return err
 }

 ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
 defer cancel()

 eventJson, err := json.Marshal(eventPayload)
 if err != nil {
  return errors.New("error converting struct to json")
 }

 err = ch.PublishWithContext(ctx,
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
   ContentType: "text/plain",
   Body:        []byte(eventJson),
  })
 if err != nil {
  return err
 }
 log.Printf(" [x] Sent to queue %s: %s\n", eventName, eventJson)
 return nil
}

func (r *RabbitMQAdapter) StartConsuming(ctx context.Context, queueName string) error {
 ch, err := r.conn.Channel()
 if err != nil {
  return err
 }
 defer ch.Close()

 q, err := ch.QueueDeclare(
  queueName, // name
  true,      // durable
  false,     // delete when unused
  false,     // exclusive
  false,     // no-wait
  nil,       // arguments
 )
 if err != nil {
  return err
 }

 msgs, err := ch.ConsumeWithContext(
  ctx,
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
 )
 if err != nil {
  return err
 }

 go func() {
  for d := range msgs {
   log.Printf("Received a message on queue %s: %s", queueName, d.Body)
   hasError := false
   for _, listener := range r.listeners[queueName] {
    w := NewQueueResponseWriter()
    body := bytes.NewBuffer(d.Body)
    r, err := http.NewRequestWithContext(ctx, http.MethodPost, queueName, body)
    if err != nil {
     log.Printf("Error processing message: %s", err)
     hasError = true
     break
    }

    listener.callback(w, r)
    if w.statusCode >= 400 {
     log.Printf("Error processing message: %s", string(w.body))
     hasError = true
     break
    }
   }

   if !hasError {
    d.Ack(false)
   }
  }
 }()

 var forever chan struct{}
 log.Printf(" [*] Waiting for messages on queue %s. To exit press CTRL+C", queueName)
 <-forever
 return nil
}

func (r *RabbitMQAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
 r.listeners[eventType.Name()] = append(r.listeners[eventType.Name()], Listener{eventType, handler})
}

Docker를 사용하여 RabbitMQ 서버 가동

루트 폴더에 docker-compose.yml 파일 생성

services:
    rabbitmq:
        image: "rabbitmq:3.8-management-alpine"
        hostname: rabbitmq
        ports:
        - "15672:15672"
        - "5672:5672"
        volumes:
        - "rabbitmq_data:/var/lib/rabbitmq/mnesia"
        environment:
        - RABBITMQ_DEFAULT_USER=guest
        - RABBITMQ_DEFAULT_PASS=guest

volumes:
  rabbitmq_data:

RabbitMQ 서버 실행

 

docker compose up -d

 

RabbitMQAdapter를 사용하도록 main.go 파일을 변경하세요 .

package main

import (
 "context"
 "eda-example/internal/application/controller"
 "eda-example/internal/application/usecase"
 "eda-example/internal/domain/event"
 "eda-example/internal/infra/queue"
 "fmt"
 "log"
 "net/http"
 "reflect"
)

func main() {
 ctx := context.Background()

 // initialize queue
 queue := queue.NewRabbitMQAdapter("amqp://guest:guest@localhost:5672/")

 // use cases
 createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
 processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
 stockMovementUseCase := usecase.NewStockMovementUseCase()
 sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()

 // controllers
 orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)

 // register routes
 http.HandleFunc("POST /create-order", orderController.CreateOrder)

 // mapping listeners
 var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
  reflect.TypeOf(event.OrderCreatedEvent{}): {
   orderController.ProcessOrderPayment,
   orderController.StockMovement,
   orderController.SendOrderEmail,
  },
 }

 // register listeners
 for eventType, handlers := range list {
  for _, handler := range handlers {
   queue.ListenerRegister(eventType, handler)
  }
 }

 // connect queue
 err := queue.Connect(ctx)
 if err != nil {
  log.Fatalf("Error connect queue %s", err)
 }
 defer queue.Disconnect(ctx)

 // start consuming queues
 OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()

 go func(ctx context.Context, queueName string) {
  err = queue.StartConsuming(ctx, queueName)
  if err != nil {
   log.Fatalf("Error running consumer %s: %s", queueName, err)
  }
 }(ctx, OrderCreatedEvent)

 // start server
 fmt.Println("Server is running on port 8080")
 http.ListenAndServe(":8080", nil)
}

 

서버를 실행하고 주문 생성 엔드포인트를 호출합니다.

 

RabbitMQ 대시보드에서 생성 및 사용된 대기열을 확인하세요.

전체 코드: https://github.com/hanhyeonkyu/event-driven-golang

300x250

관련글 더보기

댓글 영역