RabbitMQ, 도메인 중심 설계 및 클린 아키텍처 사용
EDA(이벤트 중심 아키텍처)는 애플리케이션의 느슨한 결합, 확장성, 독립성 및 복원성을 활성화하기 위해 이벤트의 생성, 감지 및 소비를 강조하는 소프트웨어 디자인 패턴입니다.
주황색 포스트로 표시되는 이벤트는 과거 에 발생한 중요한 사실이며 일반적으로 UserCreated, OrderPaid, InvoiceCanceled 등과 같은 명령을 실행한 결과입니다.
파란색 포스트로 표시되는 명령은 시스템의 작업 또는 의도이며 항상 명령형 모드입니다(예: CreateUser, PlaceOrder, PayOrder, GetProducts 등).
노란색 포스트로 표시되는 행위자는 명령을 실행하는 사람입니다(예: 사용자, 고객, 판매자, 구매자, 시스템 등).
폴더 구조(여기서는 Clean Arch 및 DDD 레이어 개념을 사용하고 있습니다)
내부/도메인/이벤트 폴더 에 이벤트 order_created_event.go를 생성합니다.
package event
type OrderCreatedEvent struct {
Id string
Items []OrderItem
TotalPrice float64
Status string
}
type OrderItem struct {
ProductName string
Quantity int
TotalPrice float64
}
내부/도메인/이벤트 폴더 에 이벤트 order_paid_event.go 생성
package event
import (
"time"
)
type OrderPaidEvent struct {
OrderId string
PaidValue float64
PaymentDate time.Time
}
먼저 uuid 패키지를 사용하여 ID를 생성하고 셸에서 실행합니다.
go mod init eda-example
go get github.com/google/uuid
내부/도메인/엔티티 폴더 에 엔터티 order_item_entity.go를 생성합니다.
package entity
type OrderItemEntity struct {
productName string
productPrice float64
quantity int
}
func NewOrderItemEntity(productName string, productPrice float64, quantity int) *OrderItemEntity {
return &OrderItemEntity{
productName: productName,
productPrice: productPrice,
quantity: quantity,
}
}
// getters
func (o *OrderItemEntity) GetProductName() string {
return o.productName
}
func (o *OrderItemEntity) GetProductPrice() float64 {
return o.productPrice
}
func (o *OrderItemEntity) GetQuantity() int {
return o.quantity
}
func (o *OrderItemEntity) GetTotalPrice() float64 {
return o.productPrice * float64(o.quantity)
}
내부/도메인/엔티티 폴더 에 엔터티 order_entity.go를 생성합니다.
package entity
import (
"errors"
"github.com/google/uuid"
)
const (
OrderStatusPending = "pending"
OrderStatusPaid = "paid"
)
type OrderEntity struct {
id string
status string
items []*OrderItemEntity
totalPrice float64
paidValue float64
}
func NewOrderEntity() (*OrderEntity, error) {
return &OrderEntity{
id: uuid.New().String(),
status: OrderStatusPending,
}, nil
}
// to populate with an existing order
func RestoreOrderEntity(id, status string) (*OrderEntity, error) {
return &OrderEntity{
id: id,
status: status,
}, nil
}
func (o *OrderEntity) AddItem(item *OrderItemEntity) {
o.items = append(o.items, item)
o.totalPrice += item.GetTotalPrice()
}
func (o *OrderEntity) Pay(value float64) error {
if value < o.totalPrice {
return errors.New("value is less than the total price")
}
o.paidValue = value
o.status = OrderStatusPaid
return nil
}
// getters
func (o *OrderEntity) GetItems() []*OrderItemEntity {
return o.items
}
func (o *OrderEntity) GetTotalPrice() float64 {
return o.totalPrice
}
func (o *OrderEntity) GetID() string {
return o.id
}
func (o *OrderEntity) GetStatus() string {
return o.status
}
내부/도메인/대기열 폴더 에 인터페이스 publisher.go를 생성합니다.
package queue
import "context"
type Publisher interface {
Publish(ctx context.Context, body interface{}) error
}
내부/응용 프로그램/dto 폴더 에 dto create_order_dto.go를 생성합니다.
package dto
type CreateOrderDTO struct {
Items []Item `json:"items"`
}
type Item struct {
ProductId string `json:"product_id"`
Qtd int `json:"qtd"`
}
내부/애플리케이션/유스케이스 폴더 에 유스케이스 create_order_usecase.go 생성
package usecase
import (
"context"
"fmt"
"eda-example/internal/application/dto"
"eda-example/internal/domain/entity"
"eda-example/internal/domain/event"
"eda-example/internal/domain/queue"
)
type CreateOrderUseCase struct {
publisher queue.Publisher
}
func NewCreateOrderUseCase(publisher queue.Publisher) *CreateOrderUseCase {
return &CreateOrderUseCase{
publisher: publisher,
}
}
func (u *CreateOrderUseCase) Execute(ctx context.Context, input dto.CreateOrderDTO) error {
fmt.Println("--- CreateOrderUseCase ---")
// create order
order, err := entity.NewOrderEntity()
if err != nil {
return err
}
for _, item := range input.Items {
// TODO: find product in the repository database here
fakeProductName := "Product " + item.ProductId
fakeProductPrice := 10.50
// create fake order item
i := entity.NewOrderItemEntity(fakeProductName, fakeProductPrice, item.Qtd)
// add items to order
order.AddItem(i)
}
// TODO: save the order in the repository database here
var eventItems []event.OrderItem
for _, item := range order.GetItems() {
eventItems = append(eventItems, event.OrderItem{
ProductName: item.GetProductName(),
TotalPrice: item.GetTotalPrice(),
Quantity: item.GetQuantity(),
})
}
// publish event OrderCreatedEvent passing the order data
err = u.publisher.Publish(ctx, event.OrderCreatedEvent{
Id: order.GetID(),
TotalPrice: order.GetTotalPrice(),
Status: order.GetStatus(),
Items: eventItems,
})
if err != nil {
return err
}
return nil
}
내부/응용 프로그램/컨트롤러 폴더 에 컨트롤러 order_controller.go를 생성합니다.
package controller
import (
"eda-example/internal/application/dto"
"eda-example/internal/application/usecase"
"encoding/json"
"net/http"
)
type OrderController struct {
createOrderUserCase *usecase.CreateOrderUseCase
}
func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase) *OrderController {
return &OrderController{
createOrderUserCase: createOrderUserCase,
}
}
func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
var requestData dto.CreateOrderDTO
json.NewDecoder(r.Body).Decode(&requestData)
err := u.createOrderUserCase.Execute(r.Context(), requestData)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
내부/ 인프라 /queue 폴더 에 memory_queue_adapter.go 생성
package queue
import (
"context"
"encoding/json"
"log"
"reflect"
)
type MemoryQueueAdapter struct {
}
func NewMemoryQueueAdapter() *MemoryQueueAdapter {
return &MemoryQueueAdapter{}
}
func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventType := reflect.TypeOf(eventPayload)
payloadJson, _ := json.Marshal(eventPayload)
log.Printf("** [Publish] %s: %v ---", eventType, payloadJson)
return nil
}
cmd/api 폴더 에 main.go 생성
package main
import (
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/infra/queue"
"fmt"
"net/http"
)
func main() {
// initialize dependencies and implementations
queue := queue.NewMemoryQueueAdapter()
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
orderController := controller.NewOrderController(createOrderUseCase)
// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)
// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}
주문을 생성하기 위해 엔드포인트를 생성합니다. 주문을 생성한 직후에는 주문이 생성되었음을 알리는 이벤트가 게시됩니다.
이제 이 이벤트를 수신 하고 여기에서 다른 명령(ProcessOrderPayment, StockMovement 및 SendOrderEmail)을 실행 해야 합니다.
내부/애플리케이션/유즈케이스 폴더 에 process_order_pay_usecase.go 생성
package usecase
import (
"context"
"fmt"
"time"
"eda-example/internal/domain/entity"
"eda-example/internal/domain/event"
"eda-example/internal/domain/queue"
)
type ProcessOrderPaymentUseCase struct {
publisher queue.Publisher
}
func NewProcessOrderPaymentUseCase(publisher queue.Publisher) *ProcessOrderPaymentUseCase {
return &ProcessOrderPaymentUseCase{
publisher: publisher,
}
}
func (h *ProcessOrderPaymentUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- ProcessOrderPaymentUseCase ---")
// TODO: find order by id in the repository database here
order, err := entity.RestoreOrderEntity(payload.Id, payload.Status)
if err != nil {
return err
}
for _, i := range payload.Items {
item := entity.NewOrderItemEntity(i.ProductName, i.TotalPrice/float64(i.Quantity), i.Quantity)
order.AddItem(item)
}
paymentValue := payload.TotalPrice
err = order.Pay(paymentValue)
if err != nil {
return err
}
fmt.Printf("Order Paid. Value: %f \n", payload.TotalPrice)
err = h.publisher.Publish(ctx, event.OrderPaidEvent{OrderId: payload.Id, PaidValue: paymentValue, PaymentDate: time.Now()})
if err != nil {
return err
}
return nil
}
내부/애플리케이션/유스케이스 폴더 에 stock_movement_usecase.go 생성
package usecase
import (
"context"
"eda-example/internal/domain/event"
"fmt"
)
type StockMovementUseCase struct {
}
func NewStockMovementUseCase() *StockMovementUseCase {
return &StockMovementUseCase{}
}
func (h *StockMovementUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- StockMovementUseCase ---")
for _, item := range payload.Items {
fmt.Printf("Removing %d items of product %s from stock\n", item.Quantity, item.ProductName)
}
return nil
}
내부/애플리케이션/유즈케이스 폴더 에 send_orderemail_usecase.go 생성
package usecase
import (
"context"
"eda-example/internal/domain/event"
"fmt"
)
type SendOrderEmailUseCase struct {
}
func NewSendOrderEmailUseCase() *SendOrderEmailUseCase {
return &SendOrderEmailUseCase{}
}
func (h *SendOrderEmailUseCase) Execute(ctx context.Context, payload *event.OrderCreatedEvent) error {
fmt.Println("--- SendOrderEmailUseCase ---")
fmt.Printf("--- MAIL Order Created: R$ %f \n", payload.TotalPrice)
return nil
}
내부/응용 프로그램/컨트롤러 폴더 의 order_controller.go 에 ProcessOrderPayment , StockMovement 및 SendOrderEmail 메서드를 만듭니다 .
package controller
import (
"encoding/json"
"net/http"
"eda-example/internal/application/dto"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
)
type OrderController struct {
createOrderUserCase *usecase.CreateOrderUseCase
processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase
stockMovementUseCase *usecase.StockMovementUseCase
sendOrderEmailUseCase *usecase.SendOrderEmailUseCase
}
func NewOrderController(createOrderUserCase *usecase.CreateOrderUseCase,
processOrderPaymentUseCase *usecase.ProcessOrderPaymentUseCase,
stockMovementUseCase *usecase.StockMovementUseCase,
sendOrderEmailUseCase *usecase.SendOrderEmailUseCase) *OrderController {
return &OrderController{
createOrderUserCase: createOrderUserCase,
processOrderPaymentUseCase: processOrderPaymentUseCase,
stockMovementUseCase: stockMovementUseCase,
sendOrderEmailUseCase: sendOrderEmailUseCase,
}
}
func (u *OrderController) CreateOrder(w http.ResponseWriter, r *http.Request) {
var requestData dto.CreateOrderDTO
json.NewDecoder(r.Body).Decode(&requestData)
err := u.createOrderUserCase.Execute(r.Context(), requestData)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
func (u *OrderController) ProcessOrderPayment(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.processOrderPaymentUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
func (u *OrderController) StockMovement(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.stockMovementUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
func (u *OrderController) SendOrderEmail(w http.ResponseWriter, r *http.Request) {
var body event.OrderCreatedEvent
json.NewDecoder(r.Body).Decode(&body)
err := u.sendOrderEmailUseCase.Execute(r.Context(), &body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
w.WriteHeader(http.StatusCreated)
}
내부/ 인프라 /queue 폴더 에 listener.go 생성
package queue
import (
"net/http"
"reflect"
)
type Listener struct {
eventType reflect.Type
callback func(w http.ResponseWriter, r *http.Request)
}
내부/ 인프라 /queue 폴더 에 response_writer.go 생성
package queue
import (
"fmt"
"net/http"
)
type QueueResponseWriter struct {
body []byte
statusCode int
header http.Header
}
func NewQueueResponseWriter() *QueueResponseWriter {
return &QueueResponseWriter{
header: http.Header{},
}
}
func (w *QueueResponseWriter) Header() http.Header {
return w.header
}
func (w *QueueResponseWriter) Write(b []byte) (int, error) {
w.body = b
return 0, nil
}
func (w *QueueResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
}
var okFn = func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func main() {
r := &http.Request{
Method: http.MethodPost,
}
w := NewQueueResponseWriter()
okFn(w, r)
fmt.Println(w.statusCode)
}
내부/도메인/queue 폴더 에 queue.go 생성
package queue
import (
"context"
"net/http"
"reflect"
)
type Queue interface {
ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request))
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Publish(ctx context.Context, body interface{}) error
StartConsuming(ctx context.Context, queueName string) error
}
내부/ 인프라 /queue 폴더 의 memory_queue_adapter.go 에 메소드 구현
package queue
import (
"bytes"
"context"
"encoding/json"
"log"
"net/http"
"reflect"
)
type MemoryQueueAdapter struct {
listeners map[string][]Listener
}
func NewMemoryQueueAdapter() *MemoryQueueAdapter {
return &MemoryQueueAdapter{
listeners: make(map[string][]Listener),
}
}
func (eb *MemoryQueueAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
eb.listeners[eventType.Name()] = append(eb.listeners[eventType.Name()], Listener{eventType, handler})
}
func (eb *MemoryQueueAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventType := reflect.TypeOf(eventPayload)
payloadJson, _ := json.Marshal(eventPayload)
log.Printf("--- Publish %s ---", eventType)
for _, listener := range eb.listeners[eventType.Name()] {
w := NewQueueResponseWriter()
body := bytes.NewBuffer(payloadJson)
r, err := http.NewRequestWithContext(ctx, http.MethodPost, eventType.Name(), body)
if err != nil {
return err
}
listener.callback(w, r)
if err != nil {
return err
}
}
return nil
}
func (eb *MemoryQueueAdapter) Connect(ctx context.Context) error {
log.Println("--- MemoryQueueAdapter connected ---")
return nil
}
func (eb *MemoryQueueAdapter) Disconnect(ctx context.Context) error {
log.Println("--- MemoryQueueAdapter disconnected ---")
return nil
}
func (eb *MemoryQueueAdapter) StartConsuming(ctx context.Context, queueName string) error {
log.Printf("--- MemoryQueueAdapter StartConsuming queue %s ---", queueName)
return nil
}
cmd/api 폴더 의 main.go 파일 에 리스너를 등록합니다.
package main
import (
"context"
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
"eda-example/internal/infra/queue"
"fmt"
"log"
"net/http"
"reflect"
)
func main() {
ctx := context.Background()
// initialize queue
queue := queue.NewMemoryQueueAdapter()
// use cases
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
stockMovementUseCase := usecase.NewStockMovementUseCase()
sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()
// controllers
orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)
// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)
// mapping listeners
var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
reflect.TypeOf(event.OrderCreatedEvent{}): {
orderController.ProcessOrderPayment,
orderController.StockMovement,
orderController.SendOrderEmail,
},
}
// register listeners
for eventType, handlers := range list {
for _, handler := range handlers {
queue.ListenerRegister(eventType, handler)
}
}
// connect queue
err := queue.Connect(ctx)
if err != nil {
log.Fatalf("Error connect queue %s", err)
}
defer queue.Disconnect(ctx)
// start consuming queues
OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()
go func(ctx context.Context, queueName string) {
err = queue.StartConsuming(ctx, queueName)
if err != nil {
log.Fatalf("Error running consumer %s: %s", queueName, err)
}
}(ctx, OrderCreatedEvent)
// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}
애플리케이션을 다시 시작하세요.
엔드포인트 POST /create-order를 다시 호출하고 콘솔 로그를 확인하세요.
Rabbitmq lib를 설치합니다:
go get github.com/rabbitmq/amqp091-go
내부/ 인프라 /queue 폴더 에 Rabbitmq_adapter.go를 생성합니다 .
package queue
import (
"bytes"
"context"
"encoding/json"
"errors"
"log"
"net/http"
"reflect"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQAdapter struct {
uri string
conn *amqp.Connection
listeners map[string][]Listener
}
type QueueMessage struct {
Body []byte
}
func NewRabbitMQAdapter(uri string) *RabbitMQAdapter {
return &RabbitMQAdapter{
uri: uri,
listeners: make(map[string][]Listener),
}
}
func (r *RabbitMQAdapter) Connect(ctx context.Context) error {
conn, err := amqp.Dial(r.uri)
if err != nil {
return err
}
r.conn = conn
return nil
}
func (r *RabbitMQAdapter) Disconnect(ctx context.Context) error {
return r.conn.Close()
}
func (r *RabbitMQAdapter) Publish(ctx context.Context, eventPayload interface{}) error {
eventName := reflect.TypeOf(eventPayload).Name()
ch, err := r.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
q, err := ch.QueueDeclare(
eventName, // queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return errors.New("error converting struct to json")
}
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(eventJson),
})
if err != nil {
return err
}
log.Printf(" [x] Sent to queue %s: %s\n", eventName, eventJson)
return nil
}
func (r *RabbitMQAdapter) StartConsuming(ctx context.Context, queueName string) error {
ch, err := r.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
msgs, err := ch.ConsumeWithContext(
ctx,
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return err
}
go func() {
for d := range msgs {
log.Printf("Received a message on queue %s: %s", queueName, d.Body)
hasError := false
for _, listener := range r.listeners[queueName] {
w := NewQueueResponseWriter()
body := bytes.NewBuffer(d.Body)
r, err := http.NewRequestWithContext(ctx, http.MethodPost, queueName, body)
if err != nil {
log.Printf("Error processing message: %s", err)
hasError = true
break
}
listener.callback(w, r)
if w.statusCode >= 400 {
log.Printf("Error processing message: %s", string(w.body))
hasError = true
break
}
}
if !hasError {
d.Ack(false)
}
}
}()
var forever chan struct{}
log.Printf(" [*] Waiting for messages on queue %s. To exit press CTRL+C", queueName)
<-forever
return nil
}
func (r *RabbitMQAdapter) ListenerRegister(eventType reflect.Type, handler func(w http.ResponseWriter, r *http.Request)) {
r.listeners[eventType.Name()] = append(r.listeners[eventType.Name()], Listener{eventType, handler})
}
루트 폴더에 docker-compose.yml 파일 생성
services:
rabbitmq:
image: "rabbitmq:3.8-management-alpine"
hostname: rabbitmq
ports:
- "15672:15672"
- "5672:5672"
volumes:
- "rabbitmq_data:/var/lib/rabbitmq/mnesia"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
rabbitmq_data:
docker compose up -d
RabbitMQAdapter를 사용하도록 main.go 파일을 변경하세요 .
package main
import (
"context"
"eda-example/internal/application/controller"
"eda-example/internal/application/usecase"
"eda-example/internal/domain/event"
"eda-example/internal/infra/queue"
"fmt"
"log"
"net/http"
"reflect"
)
func main() {
ctx := context.Background()
// initialize queue
queue := queue.NewRabbitMQAdapter("amqp://guest:guest@localhost:5672/")
// use cases
createOrderUseCase := usecase.NewCreateOrderUseCase(queue)
processPaymentUseCase := usecase.NewProcessOrderPaymentUseCase(queue)
stockMovementUseCase := usecase.NewStockMovementUseCase()
sendOrderEmailUseCase := usecase.NewSendOrderEmailUseCase()
// controllers
orderController := controller.NewOrderController(createOrderUseCase, processPaymentUseCase, stockMovementUseCase, sendOrderEmailUseCase)
// register routes
http.HandleFunc("POST /create-order", orderController.CreateOrder)
// mapping listeners
var list map[reflect.Type][]func(w http.ResponseWriter, r *http.Request) = map[reflect.Type][]func(w http.ResponseWriter, r *http.Request){
reflect.TypeOf(event.OrderCreatedEvent{}): {
orderController.ProcessOrderPayment,
orderController.StockMovement,
orderController.SendOrderEmail,
},
}
// register listeners
for eventType, handlers := range list {
for _, handler := range handlers {
queue.ListenerRegister(eventType, handler)
}
}
// connect queue
err := queue.Connect(ctx)
if err != nil {
log.Fatalf("Error connect queue %s", err)
}
defer queue.Disconnect(ctx)
// start consuming queues
OrderCreatedEvent := reflect.TypeOf(event.OrderCreatedEvent{}).Name()
go func(ctx context.Context, queueName string) {
err = queue.StartConsuming(ctx, queueName)
if err != nil {
log.Fatalf("Error running consumer %s: %s", queueName, err)
}
}(ctx, OrderCreatedEvent)
// start server
fmt.Println("Server is running on port 8080")
http.ListenAndServe(":8080", nil)
}
서버를 실행하고 주문 생성 엔드포인트를 호출합니다.
RabbitMQ 대시보드에서 생성 및 사용된 대기열을 확인하세요.
Wagtail vs Wordpress - 프로젝트에 적합한 CMS를 선택하는 방법 (1) | 2024.04.18 |
---|---|
Kubernetes의 데이터베이스는 좋은 생각인가요? (1) | 2024.04.18 |
당신의 마음을 사로잡을 가장 멋진 AI 프로젝트 6가지 (0) | 2024.04.16 |
DevOps 엔지니어를 위한 BASH/Linux 면접 질문 (0) | 2024.04.16 |
소프트웨어 엔지니어에서 소프트웨어 설계자로 — 성공을 위한 로드맵 (0) | 2024.04.16 |
댓글 영역