만들리에/오락실 (스트리밍 게임)

추억의 오락실 (8/11) - 오락기와 메시지큐

gamz 2021. 2. 2. 22:15

이번에 다룰 컴포넌트는 오락기이며 학습해볼만한 주제는 메시지큐로 잡아봤다.

유스케이스

오락기는 외부에서 요청할만한 엔드포인트를 가지 않는 백엔드 컴포넌트로써 다음과 같은 유스케이스가 있다.

- WebRTC 세션을 맺기 위해 아줌마를 거쳐서 클라이언트와 Signaling (SDP/ICE Exchange)을 하게 되는데 이때 아줌마와 통신을 위해 메시지큐(RabbitMQ)를 이용

게임(에물레이션)이 돌아가는 기판으로부터 IPC를 통해 인코딩된 비디오/오디오 패킷을 가져오고 유저의 컨트롤데이터를 기판에 전송하기 위해서도 메시지큐(Nanomsg)를 이용

비슷하면서도 다른 두 사례를 통해 각각이 사용하는 메시지큐의 특징과 용법 등을 알아보고 RabbitMQ를 이용한 간단한 수제 RPC 프레임워크인 mqrpc 도 소개해본다.

 

오락기와 아줌마 통신

 

RabbitMQ

RabbitMQ 는 AMQP(Advanced Message Queuing Protocol)의 구현체이다.

좋다. 여기서부터 시작해보자. 그럼 AMQP는 뭔가? 위키를 훑어 요약해보자면,

at-most-once, at-least-once, exactly-once 를 보장하는 메시지 통신 뿐만 아니라 SASL, TLS 기반의 인증과 보안을 제공하는 message-oriented middleware 를 위한 open standard application layer protocol이다.

 

약간의 역사

2003년에 JPMorgan Chase in London 에서 일하던 John O'Hara 가 처음으로 프로토콜을 만들었고 후에는 아주 많은(위키참고) IT 기업들이 Working Group에 참가해서 발전시켜나가고 있다. 최근(?) 행적으로는 2011 년 8월 이 Working Group이 OASIS의 멤버로 다시 조직화되었고 그해 10월에 AMQP 1.0을 릴리즈한다. 그 후로 두차례 개정을 통해 2012년 10월에 OASIS 표준안으로 채택되고 2014년 4월에는 ISO와 IEC 국제 표준으로 채택된다. (잘나가네..)

* 이 포스팅을 쓰려고 AMQP 찾아보다가 AMQP 버전이 좀 특이한거 같아 남겨두려고 한다.
- 주요 버전은 0-8 (2006-06) / 0-9 (2006/10), 0-10 (2008/2), 0-9-1 (2008/11) / 1.0 (2011-10)
- 1.0 이전 버전들은 .(점)대신 -(대시)를 씀
- 0-9-1가 0-10보다 먼저 나옴 (아마도 0-9의 패치일텐데 0-9 랑 0-10 이랑 많이 다른가?)
- 1.0 과 그 이전 버전들은 엄청나게 많이 차이난다고 함 (참고)

 

구현체

그래서 RabbitMQ는 AMQP 0-9-1 를 기반으로한 Erlang 으로 작성된 구현체(메시지 브로커)이다.

 

다른 유명(?)한 구현체로는,

- ActiveMQ

- OpenAMQ

- StormMQ

- Apache Qpid

- Red Hat Enterprise MRG

 

 

핵심개념

 

 

위 그림에서 가운데 큰 박스가 RabbitMQ 메시지 브로커이다. 이 브로커는 Publisher(혹은 Producer)로 부터 메시지를 받아서 이를 Consumer에게 라우팅을 해준다.

 

브로커 안을 조금더 들여다보자. Publisher가 발행한 메시지는 먼저 Exchange에 도착하게 되고 이 Exchange는 Binding이라고 불리는 규칙을 이용해 특정 Queue에 메시지를 복사해 넣는다. 그럼 이 Queue를 수신하고 있는 Consumer가 메시지를 당겨(fetch/pull)갈 수 있게 된다.

 

Exchange - Binding(Routing Rule) - Queue, 이 세가지가 핵심이다. 어떤 Exchange 타입을 쓰느냐 어떤 Binding(라우팅)을 구성하느냐에 따라서 다양한 메시지 통신 모델을 만들어낼 수 있다.

 

 

메시징 모델

가장 많이 쓰일만한 메시징 모델을 (스틸해서) 소개해본다.

 

Basic

Exchange 없이(진짜 없는건 아니고 nameless로 하나 생김) Publisher가 Consumer가 Subscribe하는 Queue에 바로 꽂는다. (exchange="", routing_key="queue_name")

 

 

Worker Queue

동일한 Queue에 여러 Consumer가 Subscribe하게 되면 이들끼리 메시지를 나눠갖게 된다. (ack/fairness 설정 등을 잘 살펴줘야함)

 

 

Publish/Subscribe (Fanout)

Fanout 타입의 Exchange에 여러 Queue를 Binding시킨다. 그리고 각 Queue를 Consumer들이 각각 Subscribe한다. (exchage_type="fanout", routing_key="")

 

 

Topic

Topic 타입의 Exchange에 관심있는 Routing 규칙을 정의하면서 Queue를 Binding 시킨다. (exchange_type="topic", routing_key="*.orange.*")

 

 

RPC

Server(RPC 수신측)와 Client(RPC 호출측)가 각각 자신의 Queue를 Subscribe 하고 있는 형태. Client가 Server의 Queue에 요청(메시지)을 남기며 어디로 응답(메시지)을 받을지 Queue 이름을 같이 전달한다. (아래 예시에서는 reply_to)

 

 

MqRPC

이 AMQP를 사용하기 위한 클라이언트(드라이버)는 프로그래밍 언어별로, 또 같은 언어라도 다양한 오픈소스들이 있다. 오락기와 아줌마에서 사용하는 Golang에서는 streadway/amqp 를 쓰면된다.

 

그런데 오락기와 아줌마 사이에 주고받을 메시지가 많기도하고 다양한 패턴(SendAndWait, FireAndForget)이 있을텐데 이때마다 저 클라이언트를 날로 쓸 수는 없는 노릇. 뭔가 정형화되고 정리된 형태로 쓸 수 있으면 좋겠다는 생각이 들어 간단한 MQ 기반의 RPC 프레임워크(sangwonl/mqrpc)를 하나 만들어 봤다.

 

오락기 적용 예시

// @mq/routes.go
package handlers

stype Route struct {
    MsgType mqrpc.MsgType
    Handler mqrpc.HandlerFunc
}

// @mq/handlers.go
package handlers

import (
    "encoding/json"
    "github.com/sangwonl/mqrpc"
    "models"
    "usecases"
)

type GameHandler struct {
    GameCtrlUseCase *usecases.GameCtrlUseCase
}

func (h *GameHandler) handlePlayerJoined(ctx *mqrpc.Context) interface{} {
    var playerPart models.PlayerParticipation
    json.Unmarshal(ctx.GetMessage().Payload, &playerPart)
    h.GameCtrlUseCase.JoinGame(playerPart.GameId, playerPart.PlayerId)
    return nil
}

...

func (h *GameHandler) Routes() []Route {
    return []Route{
        {MsgType: models.MsgPlayerJoined, Handler: h.handlePlayerJoined},
        ...
    }
}

// @app.go
package main

import (
    "github.com/oraksil/azumma/internal/presenter/mq/handlers"
    "github.com/sangwonl/mqrpc"
)

func setupRoutes(mqSvc *mqrpc.MqService, routes []handlers.Route) {
    for _, r := range routes {
        err := mqSvc.AddHandler(r.MsgType, r.Handler)
        panic(err)
    }
}

func main() {
    ...
    di.InitContainer()

    ...    
    di.Resolve(func(
        mqSvc *mqrpc.MqService,
        gameHandler *handlers.GameHandler) {

        setupRoutes(mqSvc, gameHandler.Routes())
        ...

        mqSvc.Run(true)
    })
    
    ...
}

 

오락기와 기판 통신

 

Nanomsg

동기

Nanomsg도 메시지 큐인데 오락기 기판 통신도 RabbitMQ를 사용하면 되는거 아닌가? 그래. 사용 못할 건 없지. 하지만 몇가지 이슈가 금방 눈에 띈다.

 

  • 오락기-기판 짝이 매우 많이 늘어난다면? 모든 기판과 오락기가 하나의 RabbitMQ 브로커에 붙다보면 언젠간 불가능할 스케일이 생길거고 이럴땐 오락기-기판을 그룹핑해서 각 그룹마다 새로운 RabbitMQ 브로커를 할당해야할지도 모른다. 계속적으로 메시지큐만을 위한 컴퓨팅 자원이 추가된다. 이는 곧 비용.

  • 돈이 썩어나서 RabbitMQ를 늘리는데 아무 이슈가 없다고 해보자. 그래도 걸리는 이슈가 있다. 기판에서 오락기로 전송되는 데이터는 비디오/오디오 데이터인데 아무리 인코딩을 했다지만 그래도 초당 수백k는 될 것이다. 이것들이 기판에서 네트워크를 타고 브로커까지 간 후 또 그안에서 버퍼 복사가 이뤄지고 다시 네트워크를 타고 오락기로 도착하는 과정에서 복사 및 네트워크 오버헤드가 너무 크다.

 

그럼 어떻게 하면 좋을까? 애초에 브로커가 없다면 그런 고민은 필요 없지 않을까? 오락기-기판 프로세스가 브로커 없이 직접 통신을하되 다양한 메시지큐 모델을 구현할 수 있게 도와주는 솔루션은 없을까?

 

이게 바로 Nanomsg를 선택한 동기라고 볼 수 있다. 이를 이용하면 브로커 없이 노드간에 직접 메세지큐를 놓을 수 있다. 사실 이런류의 메시지큐 중에는 ZeroMQ가 유명하다. 둘의 차이는 대동소이하지만 Nanomsg가 최근에 나오기도 했고 더 POSIX 호환을 지킨다고 해서 이걸 선택해봤다.

 

 

메시징 모델

Nanomsg도 RabbitMQ 처럼 다양한 메시지 모델을 고려할 수 있다.

 

Pipeline (A One-Wy Pipe)

한쪽노드에서 일방적으로 보내기만 하는 케이스에 쓰인다. (node0: NN_PUSH, node1: NN_PULL)

 

 

Request/Reply (I ask, You answer)

RPC처럼 요청을 하고 응답을 서버-클라이언트 구조에 사용된다. 여러 클라이언트 노드들이 하나의 서버에 요청을 보내고 응답을 받는 방식이다. (node0: NN_REQ, node1: NN_REP)

 

 

Pair (Two Way Radio)

하나의 커넥션으로 1:1 노드 연결 후 양방향 통신에 사용된다. (node0: NN_PAIR, node1: NN_PAIR)

 

 

Pub/Sub (Topics & Broadcast)

하나의 서버(Broadcaster)가 여러 클라이언트(Subscribers)에게 그들이 관심 있는 토픽에 따라 메시지 전달 (clientN: NN_SUB, server: NN_PUB)

 

 

Survey (Everybody votes)

서버가 연결된 클라이언트한테 설문 요청(보통 상태 수집)을 보내고 각 클라이언트는 이에 응답을 보내는 구조이다. 혹은 클라이언트 응답 타임아웃을 판단하기도 한다. (clientN: NN_RESPONDENT, server: NN_SURVEYOR)

 

 

Bus (Routing)

여러 노드들이 각각 서로서로 말그대로 막 연결되는 방식이다. 각 노드가 보낸 메시지는 자기한테 연결된 다른 노드들에게 전송된다. (nodeN: NN_BUS)

 

 

적용 예시

 

아래 기판 코드는 인코딩된 프레임을 받아서 Nanomsg IPC Queue에 Push하는 로직이다. Push 모드 소켓을 하나 만들고 bind 한 뒤(오락기가 기판으로 connect 하기 때문에) 인코딩된 프레임이 들어올때마다 소켓에 write 를 한다.

use nanomsg::{Socket, Protocol};
...

fn run_frame_handler(
    props: &GameProperties,
    frame_rx: channel::Receiver<libenc::EncodedFrame>) {

    // ex) props.imageframe_output => "./images.ipc"
    let frame_output_path = String::from(&props.imageframe_output);

    thread::spawn(move || {
        let mut socket = Socket::new(Protocol::Push).unwrap();
        socket.set_send_buffer_size(4096 * 1024).unwrap();
        socket.bind(&frame_output_path).unwrap();

        loop {
            let frame = frame_rx.recv().unwrap();
            socket.write_all(frame.buf.as_ref()).unwrap();
        }
    });
}

fn main() {
    let args: Vec<String> = env::args().collect();
    let props = extract_properties_from_args(&args);
    ...
    
    run_frame_handler(&props, img_frame_rx);
    ...

    emu.run(&props.system_name);
}

 

이제 Nanomsg IPC Queue에 들어온 데이터를 Pull해가는 오락기 코드를 보자. 게임엔진이 fork한 goroutine loop이 GipanDriver와 IpcBuffer 통해 프레임 데이터를 가져오는 로직들이다. Nanomsg 관련 부분을 집중해서 보자면, 오락기는 반대로 Pull 모드 소켓을 열고 기판이 열어둔 IPC file로 connect를 하고 recv를 통해 데이터를 fetch 해온다.

// @utils.go
package utils

import "github.com/op/go-nanomsg"

type IpcBuffer interface {
	Read() ([]byte, error)
	Write(buf []byte) error
	Close()
}

type IpcReadBuffer struct {
	socket *nanomsg.PullSocket
}

func (b *IpcReadBuffer) Read() ([]byte, error) {
	buf, err := b.socket.Recv(0)
	if err != nil {
		return nil, err
	}
	return buf, nil
}

func (b *IpcReadBuffer) Write(buf []byte) error {
	panic("write is not allowed for read buffer")
}

func (b *IpcReadBuffer) Close() {
	b.socket.Close()
}

func NewIpcBufferForRead(path string, maxBufSize int) (IpcBuffer, error) {
	sock, err := nanomsg.NewPullSocket()
	if err != nil {
		return nil, err
	}

	sock.SetRecvMaxSize(int64(maxBufSize))
	_, err = sock.Connect(path)
	if err != nil {
		return nil, err
	}

	ipcBuf := &IpcReadBuffer{
		socket: sock,
	}

	return ipcBuf, nil
}

// @gipan.go
type GipanDriverImpl struct {
	videoFrameBuffer utils.IpcBuffer
	audioFrameBuffer utils.IpcBuffer
	cmdInputBuffer   utils.IpcBuffer
}

func (g *GipanDriverImpl) ReadVideoFrame() ([]byte, error) {
	return g.videoFrameBuffer.Read()
}

func (g *GipanDriverImpl) ReadAudioFrame() ([]byte, error) {
	return g.audioFrameBuffer.Read()
}

...

func NewGipanDriver(imagesIpcPath, soundsIpcPath, cmdsIpcPath string) engine.GipanDriver {
    ...
    vb, err := utils.NewIpcBufferForRead(imagesIpcPath, maxVideoFrameBuffer)
	ab, err := utils.NewIpcBufferForRead(soundsIpcPath, maxAudioFrameBuffer)

	return &GipanDriverImpl{
		videoFrameBuffer: vb,
		audioFrameBuffer: ab,
		cmdInputBuffer:   cb,
	}
}

// @engine.go
func (e *GameEngine) handleVideoFrame() {
	for {
		buf, err := e.gipan.ReadVideoFrame()
		if err != nil {
			continue
		}

		e.front.WriteVideoFrame(buf)

		if e.poisonPill {
			break
		}
	}
}

func (e *GameEngine) Run(
	props *EngineProps,
	gameInfo *models.GameInfo,
	msgService func(string, interface{}),
	eventHandler func(string)) {
    ...

	// gipan -> renderer
	go e.handleAudioFrame()
	go e.handleVideoFrame()
    ...
}

 

정리

지금까지 RabbitMQ랑 Nanomsg를 대충 살펴봤는데 이 둘을 비교해보면서 정리해보자.

 

RabbitMQ

Nanomsg

브로커

필수

없음

IPC

불가

가능

프로토콜

AMQP 0-9-1

POSIX/Socket 호환

핵심

Direct, Worker, Pub/Sub, Topic, RPC

Pipeline, Req/Resp, Pair, Pub/Sub, Survey, Bus

구현언어

Erlang

C

대안

ActiveMQ, OpenAMQ, StormMQ, ...

ZeroMQ