Go

Go의 채널 처리 패턴 모음

드리프트2 2024. 4. 13. 12:19

 

안녕하세요?

 

오늘은 Go의 채널 처리 패턴에 대해 알아보겠습니다.

 

** 목 차 **


Go의 장점 중 하나는 고루틴을 통한 비동기 처리입니다.

 

하지만 채널 다루기에 있어서 마술 같은 패턴이 등장하기 쉽습니다.

 

이를 최소화하고자 채널 관련 패턴을 정리해보았습니다.

 

치트시트처럼 활용하시면 좋을 것 같습니다.

Go의 채널 기초


입문 자료로 활용하실 수 있도록 본론에 들어가기 전 채널의 기본을 간단히 정리했습니다.

정의

채널에는 용량(capacity)이라는 개념이 있습니다.

 

용량은 채널 내에서 버퍼링할 수 있는 크기를 의미하며, 용량이 꽉 차면 송신 측이 대기(블록)하게 됩니다.

 

용량 없는 정의(int형, 용량 0)

ch := make(chan int)

 

 

용량 있는 정의(int형, 용량 10)

ch := make(chan int, 10)

close

채널은 close 함수로 닫을 수 있습니다.

 

수신 측에서 이를 감지할 수 있으므로 수신 측에 종료 신호를 보내는 데 사용합니다.

close(ch)

송신

채널에 정의한 타입의 값을 보낼 수 있습니다.

 

앞서 언급했듯이, 용량이 없다면 수신될 때까지 블록됩니다.

ch <- 1

수신

수신 방식은 6가지 패턴이 있습니다.

 

수신하거나 close된 것만 감지하는 패턴.

 

비동기 처리의 종료를 기다릴 때 자주 사용됩니다.

<-ch

 

 

값을 가져오는 패턴.

 

close되면 0값이 들어갑니다.

v := <-ch

 

 

값이 들어왔는지 close되었는지 구분하는 패턴.

 

ok가 false면 close된 것입니다.

v, ok := <-ch
if !ok {
    // closed
}

 

 

select-case로 기다리는 패턴.

 

순서는 보장되지 않지만 여러 채널을 동시에 기다릴 수 있습니다.

select {
case v := <-ch1:
    // ch 1
case v := <-ch2:
    // ch 2
}

 

 

default를 붙이면 값이 있는 채널이 없으면 대기하지 않고 default가 실행됩니다.

select {
case v := <-ch1:
    // ch 1
case v := <-ch2:
    // ch 2
default:
    // default
}

 

 

for-range로 처리할 수도 있습니다. close되면 반복문을 빠져나옵니다.

for v := range ch {
    // do something
}
// closed

주의해야 할 사항

채널 사용에 있어 주의해야 할 사항입니다.

 

이러한 문제를 해결하기 위해 '마술 같은' 패턴이 등장하곤 합니다.

  • 닫힌 채널에 이벤트를 보내면 panic이 발생합니다.
  • 채널을 중복으로 닫아도 panic이 발생합니다.
  • 수신측이 먼저 사라지면 계속 대기 상태에 빠집니다.

Go에서는 이런 모호한 상황을 없애고자 채널 작업 시 panic을 발생시킵니다.


일반적인 안티패턴

실용적인 패턴 모음에 들어가기 전에, 자주 볼 수 있는 안티패턴을 소개하겠습니다.

 

이러한 패턴은 계속 블록되거나 panic을 일으킬 수 있습니다.

종료를 수신자에게 알리기 위해 종료 이벤트를 보내기

간단한 처리라면 잘 작동할 수 있지만, 수신자가 종료용 채널 외 다른 무언가로 대기를 빠져나간 경우 길을 잃게 됩니다.

func A(done chan struct{}) {
    // do something
    done <- struct{}{}
}

func B() {
    done := make(chan struct{})
    go A(done)

    // do something
    // 여기서 return이나 panic이 일어나면 A의 goroutine이 받는 곳이 없어 계속 대기함

    <-done
}

 

 

close를 사용하는 것이 좋습니다.

func A(done chan struct{}) {
    // do something
    close(done)
}

func B() {
    done := make(chan struct{})
    go A(done)

    // do something

    <-done
}

 

 

가끔 select-case로 이런 코드를 볼 수 있지만, 앞서 말한 것처럼 계속 블록될 수 있습니다.

func worker(jobChan chan Job, done chan struct{}) {
    for {
        select {
        case j := <-jobChan:
            // do something
        case <-done:
            return
        }
    }
}

func B(jobs []Job) {
    jobChan := make(chan Job)
    done := make(chan struct{})
    go worker(jobChan, done)

    for _, j := range jobs {
        jobChan <- j
    }

    done <- struct{}{}
}

 

 

위와 같은 단순한 처리라면 가능한 한 for-range를 사용하는 것이 좋습니다.

 

close되면 빠져나올 수 있습니다.

 

샘플은 다음과 같습니다.

func worker(jobChan chan Job) {
    for j := range jobChan {
        // do something
    }
}

func B(jobs []Job) {
    jobChan := make(chan Job)
    defer close(jobChan)
    go worker(jobChan)

    for _, j := range jobs {
        jobChan <- j
    }
}

 

close될 가능성이 있는 채널을 첫 번째 반환값으로만 받기

 

select로 대기하는 경우 실수하기 쉽습니다.

 

close되면 첫 번째 반환값이 영(zero)값이 되므로 예기치 않은 동작이 발생할 수 있습니다.

func A() {
    select {
    case v := <-valueORCloseChan:
        // do something with v
        // 채널이 close되면 v가 영(zero)값이 됨
    case v := <-otherChan:
        // do something
    }
}

 

 

ok로 판단하고 분기시킵니다.

func A() {
    select {
    case v, ok := <-valueORCloseChan:
        if !ok {
            // closed
        }
        // do something with v
    case v := <-otherChan:
        // do something
    }
}

goroutine에서 여러 이벤트가 올 수 있는데 처음 것만 받고 빠져나가기

에러용 채널을 기다리는 경우 자주 있습니다.

 

처음 것 외에는 길을 잃게 됩니다.

 

특히 capacity가 0인 채널의 goroutine은 계속 블록됩니다.

func A(wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    if err := doSomething(); err != nil {
        // 여기가 여러 번 발생하면 걸림
        errChan <- err
    }
}


func B() error {
    wg := new(sync.WaitGroup)
    errChan := make(chan error)
    done := make(chan struct{})

    wg.Add(2)
    go A(wg, errChan)
    go A(wg, errChan)

    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil
    case err := <-errChan:
        return err
    }
}

 

 

받는 것이 하나면 되는 경우, 이벤트 전송 측을 sync.Once로 한 번만 보내도록 통일하는 것이 편리합니다.

 

그렇지 않으면 sync.Once를 전달하는 것이 복잡해지므로, 설계 자체가 복잡한 것은 아닌지 의심해볼 필요가 있습니다.

var once sync.Once

func A(wg *sync.WaitGroup, errChan chan error) {
    defer wg.Done()
    if err := doSomething(); err != nil {
        once.Do(func() {
            errChan <- err
        })
    }
}


func B() error {
    wg := new(sync.WaitGroup)
    errChan := make(chan error)
    done := make(chan struct{})

    wg.Add(2)
    go A(wg, errChan)
    go A(wg, errChan)

    go func() {
        wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        return nil
    case err := <-errChan:
        return err
    }
}

 

 

select 발화 순서는 보장되지 않으므로 약간 위험한 코드입니다.

 

진지하게 하려면 done이 아니라 errChan을 close하는 것이 좋습니다.

 

에러를 수집하는 경우 errgroup이 좋습니다. sync.WaitGroup과 유사한 사용감이지만, context를 사용하여 하나라도 실패하면 다른 처리를 취소하는 기능이 있어 매우 편리합니다.

 

채널이 닫혔는지 확인하고 싶은 경우

여러 전송측 처리가 있고 수신은 하나만 필요한 경우 필요할 수 있습니다.

 

하지만 현재로서는 편리한 인터페이스가 제공되고 있지 않습니다.

 

수신해 보고 두 번째 반환값이 false면 닫힌 것이지만, 수신하면 첫 번째 반환값을 일반적으로 받게 되므로 좋지 않습니다.

 

일단 다음과 같이 구현해 보았지만, 안티패턴 냄새가 심합니다.

func isClosed(ch chan Value) bool {
    select {
    case v, ok := <-ch:
        if !ok {
            return true
        }
        go func() {
            ch <- v
        }()
    default:
    }
    return false
}

func A(ch chan Value) {
    if !isClosed(ch) {
        ch <- SomeValue()
    }
}

 

위의 코드는 lock을 포함하고 있지 않아, 검사 후 close될 가능성이 있으므로 실용적이지 않습니다.

 

lock 처리까지 넣으면 복잡한 블랙마술이 되므로, 다른 방법을 찾는 것이 좋습니다.

 

고속으로 보내기

예를 들어 단순한 slice 처리에 비해 성능이 나쁠 수밖에 없으므로, 고성능이 필요한 부분에서는 벤치마크를 수행하여 검증하는 것이 좋습니다.

 

약간 대략적이지만 단순하게 slice로 입출력하는 경우와 채널을 거쳐 입출력하는 경우의 벤치마크를 해보았습니다.

func BenchmarkWithSimpleSlice(b *testing.B) {
    src := make([]int, 10000)
    dst := make([]int, 10000)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // write
        for i := range src {
            dst[i] = src[i]
        }
        // read
        for i := 0; i < len(src); i++ {
            _ = dst[i]
        }
    }
}

func BenchmarkWithChannel(b *testing.B) {
    src := make([]int, 10000)
    ch := make(chan int, 10000)
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // write
        for i := range src {
            ch <- src[i]
        }
        // read
        for i := 0; i < len(src); i++ {
            <-ch
        }
    }
}

 

 

결과는 다음과 같습니다.

BenchmarkWithSimpleSlice       2000000              8991 ns/op
BenchmarkWithChannel             30000            580989 ns/op

 

 

약 570,000 ns/op 정도의 차이가 나므로, 약 57 ns/item 정도의 차이가 있습니다.

 

벤치마크로서의 타당성은 별개로 하더라도, 고성능이 필요한 곳에서는 어느 정도 고려가 필요할 것 같습니다.


채널로 병렬 수 관리하는 경우의 패턴

여기부터는 실용적인 패턴 모음입니다.

 

병렬 수가 10이라면 다음과 같습니다.

func A(jobs []Job) {
    sem := make(chan struct{}, 10)
    for _, j := range jobs {
        sem <- struct{}{}
        go func() {
            j.Run()
            <-sem
        }()
    }
}

 

 

capacity 기능을 사용하여 블록시킴으로써 병렬 수 관리를 하고 있습니다.

 

worker 용도로 사용하는 경우, queue 획득 타이밍에 주의합니다.

 

채널에 넣기 전에 queue를 획득하면 잡고 있는 채로 블록됩니다.

 

고려하려면 다음과 같은 식으로 하면 됩니다.

func A(jobs chan Job) {
    sem := make(chan struct{}, 10)
    for {
        sem <- struct{}{}
        j, ok := <-jobs
        if !ok {
            break
        }
        go func() {
            j.Run()
            <-sem
        }()
    }
}

 

 

가중치가 필요한 경우 semaphore가 편리합니다.


채널로 처리 종료를 기다리는 패턴

단순히 기다리기만 하면 됩니다.

 

close하여 사용하는 것이 좋습니다.

 

참고로 값이 필요 없는 경우 struct{}가 크기 0의 데이터이므로 좋습니다.

func A(done chan struct{}) {
    defer close(done)
    // do something
}

func B() {
    done := make(chan struct{})
    go A(done)

    // wait
    <-done
}

채널의 close를 감지하는 패턴

앞서 말한 바와 같이 안티패턴이지만, close만 하는 채널은 사용할 수 있는 경우가 있습니다.

 

특히 메서드 내에서 context의 상태를 확인하는 데 가끔 사용합니다.

 

cancel되었는지 확인하고 싶을 때 Done()으로 반환되는 chan이 닫혔는지 판단하고 싶은 경우입니다.

 

select-case의 default를 사용합니다.

var isDone bool
select {
    case <-ctx.Done():
        isDone = true
    default:
        isDone = false
}

 

 

이런 식입니다. 메서드화해서 사용하기도 합니다.

func isDone(ctx context.Context) bool {
    select {
        case <-ctx.Done():
            return true
        default:
            return false
    }
}

 

 

가끔 Done 감지 샘플에 default가 없는 것이 있는데, select 문에는 default가 없으면 어느 채널에서든 이벤트가 올 때까지 블록되므로 주의해야 합니다.


채널에서 반복 처리하는 경우의 패턴 모음

기본적으로 송신 측을 먼저 처리하고 -> 채널을 닫은 뒤 -> 수신 측이 대기에서 빠져나오는 순서로 처리하는 것이 중요합니다.

 

select 대기도 사용할 수 있지만, 가능한 한 for-range와 defer를 사용하는 것이 좋습니다.

송신 측 1개, 수신 측 1개

병렬 처리할 만큼은 아니지만 수신 처리를 빨리 마무리하고 지연 처리를 하고 싶은 단순한 패턴입니다.

 

채널의 용량이 넘치지 않는 선에서 수신 측이 잘 처리하고 있는(따라가고 있는) 경우에는 이것으로 충분합니다.

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

func A() error {
    queue := make(chan Job, 100)
    defer close(queue)

    go worker(queue)

    for {
        j, err := someHandle()
        if err != nil {
            return err
        }
        queue <- j
    }
    return nil
}

 

 

따라가지 않게 되면 다음과 같이 수신 측을 늘려나갑니다.

송신 측 1개, 수신 측 다수

수신 측의 처리가 1개로는 따라가기 힘든 경우에 자주 쓰는 워커 스타일입니다.

 

단순한 goroutine 실행과의 차이는 병렬 수를 관리한다는 것입니다.

 

송신 측이 1개이므로, 수신 측이 여러 개로 늘어나도 채널을 닫아주기만 하면 문제 없습니다.

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

const workerNum = 10

func A() error {
    queue := make(chan Job, 100)
    defer close(queue)

    for i := 0; i < workerNum; i++ {
        go worker(queue)
    }

    for {
        j, err := someHandle()
        if err != nil {
            return err
        }
        queue <- j
    }
    return nil
}

 

송신 측 다수, 수신 측 1개

병렬 처리 결과를 수집하고 싶은 경우에 자주 쓰는 패턴입니다.

 

취소든 정상 종료든, 모든 송신 처리가 끝났음을 확인하고 나서 수신 측을 빠져나와야 합니다.

 

송신 처리의 goroutine이 몇 개 남아 있는지 세어주고, 모두 정리되면 채널을 닫아주면 됩니다.

 

atomic 계열로 세어줘도 되지만, sync.WaitGroup을 선호합니다.

 

페이징 식 요청을 분할해서 병렬 실행하고 결과를 배열에 모으는 패턴입니다.

func someRequest(from, to uint, resultChan chan Result) {
    results := request(from, to)
    for _, r := range results {
        resultChan <- r
    }
}

func A() []Result {
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(from, to, resultChan)
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Result, 0, 10000)
    for _, r := range resultChan {
        results = append(results, r)
    }

    return results
}

 

각 송신 측에 취소를 전달하고 싶다면 전용 채널을 전달해주는 것도 좋지만, context를 전달하는 것이 편리합니다.

 

외부 명령이나 외부 요청에도 그대로 전달할 수 있기 때문입니다.

func someRequest(ctx context.context, from, to uint, resultChan chan Result) {
    results := request(ctx, from, to)
    for _, r := range results {
        resultChan <- r
    }
}

func A(ctx context.Context) []Result {
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(ctx, from, to, resultChan)
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Result, 0, 10000)
    for _, r := range resultChan {
        results = append(results, r)
    }

    return results
}

 

송신 측 다수, 수신 측 다수

 

Dispatcher - Worker 스타일입니다.

 

Dispatcher를 직접 만드는 샘플이 많이 보이지만, 타입에 따라 전달할 worker를 변경하는 등의 분기가 필요없다면 채널 자체가 dispatcher가 됩니다.

 

기본적으로 수신 측이 늘어나도 채널을 닫아주면 문제 없습니다.

func worker(queue chan Job) {
    for j := range queue {
        // do something
    }
}

func someRequest(from, to uint, callbackJobQueue chan Job) {
    results := request(from, to)
    for _, r := range results {
        callbackQueue <- NewJob(r)
    }
}

const (
    workerNum = 10
)

func A() {
    queue := make(chan Job, 100)
    defer close(queue)
    wg := new(sync.WaitGroup)

    for i := 0; i < workerNum; i++ {
        go worker(queue)
    }
    for i := 0; i < 99; i++ {
        wg.Add(1)
        from := i * 100
        to := (i + 1) * 100
        go func() {
            defer wg.Done()
            someRequest(from, to, queue)
        }()
    }

    wg.Wait()
}

 

 

결과를 수집하려면 worker에 수집용 채널을 전달하면 됩니다.


select 대기에 대해

기본적으로 for-range를 추천합니다.

 

select를 사용하는 경우는 다음과 같습니다.

  • 루프가 아니라 한 번만 발동하는 경우
    • 모든 것이 닫힐 때까지 대기
      • 어떤 것이 닫히면 처리를 빠져나가는 경우. 단, 발동 순서가 보장되지 않아 사용처가 많이 없습니다.
    • 대기하는 여러 채널 중 하나만 반환된다는 것이 보장되는 경우
      • 비동기 처리에서 resolve나 reject 중 하나만 반환되는 경우 등.
    • 하나만 받으면 그만인 경우
      • 예를 들어 시그널 처리. 프로세스를 종료하려면 하나만 받으면 됩니다.
        • 시그널
  • 빠져나갈 생각이 없는 루프
    • 프로세스가 죽을 때까지 계속 돌려야 하는 경우 여러 가지를 신경 쓰지 않아도 됩니다.
  • 송신 측이 정말로 다 정리되었음을 확인할 수 있는 경우
    • 앞서의 송신 측 다수, 수신 측 1개 패턴과 같이 sync.WaitGroup 등으로 관리할 수 있는 경우입니다. 그러나 for-range와 달리 모든 채널이 수신 완료되었음을 보장하지 않아 수신 누락이 생길 수 있어 복잡합니다.
    • 각각 for-range로 조직하는 것이 더 단순해지는 경우가 많습니다(개인적 의견).

수신 누락에 대한 고민이 많은 채널 루프 경우, for-range에 기대는 것이 편할 것 같습니다.

 

timeout 처리나 context.Done 처리는 골치 아픈 부분이지만, 가능한 한 context 기능에 의지하고, 정말 필요한 곳에서만 수신 누락을 고려한 select 대기를 하는 것이 좋습니다.

 

capacity 없는 결과용 채널을 반환하고, timeout 처리가 제대로 구현되지 않은 메서드가 있어도 context 대응으로 간단하게 사용할 수 있는 wrapper를 만들면 좋습니다.

 

참고로 흔한 실수로, select in for loop에서 닫힌 채널이 있으면 무한 루프가 됩니다.

func main() {
    ch1 := make(chan struct{})
    ch2 := make(chan struct{})
    close(ch1)

    for {
        select {
        case <- ch1:
            log.Println("1")
        case <- ch2:
            log.Println("2")
        }
    }
}

 

 

위와 같이 select-case에서 닫힌 채널이 있으면 계속해서 "1"이 출력됩니다.

 

다단계 이벤트 대기에서 수신 측에서 송신 측으로 송신 중지를 전파하고 싶은 경우에 자주 그렇게 합니다.

func A(ch1 chan Event, stopA chan struct{}) {
    defer close(ch1)

    t := time.NewTicker(3 * time.Second)
    defer t.Stop()

    for {
        select {
        case <-t.C:
            ch1 <- handle()
        case <-stopA:
            return
        }
    }
}

func B(ch1 chan Event, ch2 chan Job, stopB chan struct{}, stopA chan struct{}) {
    defer close(ch2)
    for {
        select {
        case e, ok := <-ch1:
            if !ok {
                return
            }
            ch2 <- NewJob(e)
        case <-stopB:
            close(stopA)
        }
    }
}

func C(ch2 chan Job, stopB chan struct{}) {
    for j := range ch2 {
        if err := j.Run(); err != nil {
            close(stopB)
        }
    }
}

 

위와 같은 Event를 받아서 Job을 dispatch하는 코드에서, B의 stopB 케이스가 연속 발동하여 stopA를 중복 닫아 panic이 발생합니다.

 

대응 방법으로는 채널별로 대기하는 goroutine을 분리하거나, nil로 덮어쓰고 case 판정에서 제외하는 등의 방법이 있습니다.

 

개인적으로는 설계에 문제가 있다고 판단되어 리팩토링하고 싶어집니다.


에러 수집(하나만 잡으면 됨)

errgroup을 사용하면 편합니다.

에러 수집(여러 개 모음)

for-range로 각 처리에서 error를 모으고, 전체 처리가 끝나면 채널을 닫습니다.

func A() []error {
    errChan := make(chan error, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if err := doSomething(); err != nil {
                errChan <- err
            }
        }()
    }

    go func() {
        wg.Wait()
        close(errChan)
    }()

    errors := make([]error, 0, 100)
    for r := range errChan {
        errors = append(errors, r)
    }

    return errors
}

에러 수집(결과도 함께 수집)

결과와 에러를 하나의 구조체로 정리하면 에러만 수집하는 것과 동일한 방식으로 작성할 수 있습니다.

type Wrapper struct {
    Result Result
    Error  error
}

func A() []Wrapper {
    resultChan := make(chan Wrapper, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            result, err := doSomething();
            resultChan <- Wrapper{
                Result: result,
                Error:  err,
            }
        }()
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    results := make([]Wrapper, 0, 100)
    for r := range resultChan {
        results = append(results, r)
    }

    return results
}

 

 

채널을 여러 개 만들어 병렬로 처리하는 방법도 있습니다.

 

다만 채널의 capacity에 주의하지 않으면 블록이 발생할 수 있습니다(아래 예시에서는 errChan을 먼저 처리하므로, resultChan이 가득 차면 막히게 됩니다).

func A() ([]Result, []error) {
    errChan := make(chan error, 100)
    resultChan := make(chan Result, 100)
    wg := new(sync.WaitGroup)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            result, err := doSomething();
            if err != nil {
                errChan <- err
                return
            }
            resultChan <- result
        }()
    }

    go func() {
        wg.Wait()
        close(errChan)
        close(resultChan)
    }()

    errors := make([]error, 0, 100)
    for r := range errChan {
        errors = append(errors, r)
    }
    results := make([]Result, 0, 100)
    for r := range resultChan {
        results = append(results, r)
    }

    return results, errors
}

 

전자의 방법이 더 안전하지만, 개인적으로는 후속 처리에서 결과와 에러를 분리할 수 있다면 capacity에 주의하면서 후자의 방법도 괜찮다고 생각합니다.

이상

이 정도의 패턴을 조합하면 자주 나오는 처리를 작성할 수 있을 것 같습니다.

 

잘못된 점이나 개선할 점이 있다면 언제든 지적해 주시기 바랍니다.