Go

Go 언어에서 chan chan 즉, 채널을 채널로 주고받기

드리프트2 2024. 9. 19. 22:35

Go 언어에서 chan chan 즉, 채널을 채널로 주고받기

Go 언어에서 채널을 채널로 주고받을 수 있다는 것, 알고 계신가요?

 

의외로 편리해서 몇 가지 사용 예를 소개하려고 합니다.


사용 예 1: Request/Response 패턴

채널은 일반적으로 단방향 데이터 전달을 하지만, 채널을 이중으로 사용하면 응답을 받을 수 있는데요.

 

예를 들어, 처리 결과의 error를 받고 싶다면 chan chan error를 사용할 수 있습니다.

reqc := make(chan chan error)

 

요청을 보내는 쪽에서는 chan error를 만들어서 chan chan에 전송합니다.

 

이 채널로 결과가 돌아오며, 결과가 반환될 때까지 블록됩니다.

ch := make(chan error)
reqc <- ch // 요청 전송!!

err := <-ch  // 결과 대기

 

요청을 받는 쪽에서는 받은 채널에 처리 결과(error)를 반환합니다.

// 예를 들어 for-select 루프에서
for {
    select {
    case ch := <-reqc:
        err := doSomething()
        ch <- err // error 반환
    }
}

샘플 코드

package main

import (
    "fmt"
    "log"
    "time"
)

type Daemon struct {
    reqc chan chan error
}

func (d *Daemon) StartLoop() {
    go func() {
        for {
            select {
            case ch := <-d.reqc:
                err := doSomething()
                ch <- err // error 반환
            }
        }
    }()
}

func (d *Daemon) DoSomething() error {
    ch := make(chan error)
    d.reqc <- ch                 // 요청 전송!!
    if err := <-ch; err != nil { // 결과 대기
        return err
    }
    return nil
}

func doSomething() error {
    // 어떤 작업을 수행합니다.
    time.Sleep(1 * time.Second)
    return nil
}

func main() {
    d := &Daemon{
        reqc: make(chan chan error),
    }

    d.StartLoop()

    if err := d.DoSomething(); err != nil {
        log.Fatal(err)
    }

    fmt.Println("done")
}

사용 예 2: 처리 완료 대기

Request/Response와 유사하지만, 단순히 처리 완료를 대기하는 데 사용할 수 있습니다.

 

GitHub의 오픈 소스를 살펴보니 이 용례가 비교적 많았습니다.

 

결과를 반환할 필요가 없으므로 chan chan빈 구조체 타입으로 준비합니다.

reqc := make(chan chan struct{})

 

요청을 보내는 쪽에서는 chan struct{}를 만들어서 chan chan에 전송합니다.

 

처리 완료는 close로 통지되므로 채널 수신으로 이를 기다립니다.

ch := make(chan struct{})
reqc <- ch // 요청 전송!!

<-ch  // 처리 완료 대기 (채널이 close되면 재개)

 

요청을 받는 쪽에서는 처리가 완료되면 받은 채널을 close합니다.

for {
    select {
    case ch := <-reqc:
        doSomething()
        close(ch) // 처리 완료를 알리기 위한 close
    }
}

샘플 코드

package main

import (
    "fmt"
    "time"
)

type Daemon struct {
    reqc chan chan struct{}
}

func (d *Daemon) StartLoop() {
    go func() {
        for {
            select {
            case ch := <-d.reqc:
                doSomething()
                close(ch) // 처리 완료를 알리기 위한 close
            }
        }
    }()
}

func (d *Daemon) DoSomething() {
    ch := make(chan struct{})
    d.reqc <- ch
    <-ch // 처리 완료 대기
}

func doSomething() {
    // 어떤 작업을 수행합니다.
    time.Sleep(1 * time.Second)
}

func main() {
    d := &Daemon{
        reqc: make(chan chan struct{}),
    }

    d.StartLoop()

    d.DoSomething()

    fmt.Println("done")
}

사용 예 3: Subscriber 등록

어떤 의미에서는 자연스러운 용도인데요, Publisher/Subscriber 패턴에서 Subscriber(채널)를 등록하는 데 사용합니다.

샘플 코드

package main

import (
    "fmt"
    "time"
)

type PubSub struct {
    subscribe   chan chan string
    unsubscribe chan chan string
    publish     chan string
}

func (ps *PubSub) Subscribe(sub chan string) {
    ps.subscribe <- sub
}

func (ps *PubSub) Unsubscribe(sub chan string) {
    ps.unsubscribe <- sub
}

func (ps *PubSub) Publish(msg string) {
    ps.publish <- msg
}

func (ps *PubSub) Start() {
    go func() {
        subscribers := make(map[chan string]struct{})

        for {
            select {
            case ch := <-ps.subscribe:
                subscribers[ch] = struct{}{}
            case ch := <-ps.unsubscribe:
                delete(subscribers, ch)
            case msg := <-ps.publish:
                for sub := range subscribers {
                    select {
                    case sub <- msg:
                    default:
                    }
                }
            }
        }
    }()
}

func main() {
    ps := &PubSub{
        subscribe:   make(chan chan string),
        unsubscribe: make(chan chan string),
        publish:     make(chan string),
    }

    ps.Start()

    sub := make(chan string)
    ps.Subscribe(sub)

    ps.Publish("Hello, World!")

    select {
    case msg := <-sub:
        fmt.Println(msg)
    case <-time.After(1 * time.Second):
        fmt.Println("Timeout")
    }
}

사용 예 4: 순서 보장

사실 개인적으로 가장 자주 사용하는 용례인데요.

 

고루틴으로 처리를 고속화하면서도 순서를 보장하고 싶을 때 chan chan을 이용합니다.

 

예를 들어 큰 파일을 읽으면서 한 줄씩 어떤 패치(DB 액세스, RPC 등)를 수행하고 가공하는 파이프라인 처리를 생각해볼까요.

 

아래는 chan chan을 적용하기 전의 코드입니다.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// 파일에서 한 줄씩 읽어옵니다
func readSomething() <-chan string {
    outCh := make(chan string)

    go func() {
        defer close(outCh)

        for i := 0; i < 1000; i++ {
            // 파일을 읽는 대신 Sleep
            time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
            outCh <- fmt.Sprintf("line:%d", i)
        }
    }()

    return outCh
}

// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)

    go func() {
        defer close(outCh)

        for line := range inCh {
            // 패치하는 대신 Sleep
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            outCh <- fmt.Sprintf("%s ... fetched!", line)
        }
    }()

    return outCh
}

func main() {
    start := time.Now()

    for line := range fetchSomething(readSomething()) {
        fmt.Println(line)
    }

    fmt.Println("done", time.Since(start))
}

실행 결과

line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
...
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 49.956134795s

 

고루틴을 사용하고 있지만 거의 직렬 처리라서 시간이 많이 걸립니다.

 

그래서 패치 처리를 고루틴으로 병렬 처리하여 고속화를 시도해볼게요.

 

아래는 chan chan이 아직 등장하지 않은 코드입니다.

// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)
    sem := make(chan struct{}, 10)

    go func() {
        defer close(outCh)

        for line := range inCh {
            sem <- struct{}{}

            go func(line string) {
                defer func() { <-sem }()

                // 패치하는 대신 Sleep
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                outCh <- fmt.Sprintf("%s ... fetched!", line)
            }(line)
        }

        // 세마포어가 모두 해제될 때까지 대기
        for i := 0; i < cap(sem); i++ {
            sem <- struct{}{}
        }
        close(sem)
    }()

    return outCh
}

 

무조건 모두 고루틴으로 돌리면 매우 빨라지지만, 입력 파일이 거대해지면 그만큼 메모리 등의 리소스를 소비하므로 세마포어를 사용하여 동시 실행 수를 10으로 제어합니다.

실행 결과

line:3 ... fetched!
line:8 ... fetched!
line:7 ... fetched!
line:2 ... fetched!
...
line:995 ... fetched!
line:996 ... fetched!
line:993 ... fetched!
line:999 ... fetched!
done 5.059474157s

 

빨라졌습니다!

 

하지만 잘 보면 결과의 순서가 뒤바뀌었는데요.

 

이는 패치를 위한 여러 고루틴이 순서와 상관없이 결과를 출력 채널에 쓰기 때문입니다.

 

출력 순서가 상관없다면 이대로도 괜찮지만, 때로는 출력 순서를 입력과 맞추고 싶을 때가 있죠.

 

그럴 때 chan chan의 등장입니다!

// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
    outCh := make(chan string)
    outChCh := make(chan chan string, 10)

    go func() {
        defer close(outChCh)

        for line := range inCh {
            ch := make(chan string)
            outChCh <- ch

            go func(line string, ch chan string) {
                // 패치하는 대신 Sleep
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                ch <- fmt.Sprintf("%s ... fetched!", line)
                close(ch)
            }(line, ch)
        }
    }()

    go func() {
        defer close(outCh)

        for ch := range outChCh {
            outCh <- <-ch
        }
    }()

    return outCh
}

 

출력용 버퍼가 있는 chan chan을 생성하고, 상류 채널에서 데이터를 받으면 먼저 패치 처리 출력용 채널을 생성하고 이를 출력용 chan chan에 전송합니다.

 

그리고 고루틴으로 패치 처리를 수행하고 결과를 해당 채널에 전송합니다.

 

이 방식으로 입력된 순서대로 출력이 가능합니다.

실행 결과

line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
...
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 7.275727182s

 

순서가 정상적으로 맞았습니다!

 

순서 무관 버전보다 약간 처리량이 떨어지지만, 이는 어쩔 수 없는 부분입니다.

chan chan버퍼 크기로 처리량과 메모리 리소스의 트레이드오프를 조정할 수 있습니다.


마무리하며

이렇게 chan chan의 멋진 사용 예들을 소개해드렸는데요.

 

Go 언어에서 채널의 채널을 활용하면 보다 유연하고 강력한 동시성 패턴을 구현할 수 있습니다.

 

여러분도 한 번 시도해보시면 어떨까요?