Go 언어에서 chan chan
즉, 채널을 채널로 주고받기
Go 언어에서 채널을 채널로 주고받을 수 있다는 것, 알고 계신가요?
의외로 편리해서 몇 가지 사용 예를 소개하려고 합니다.
사용 예 1: Request/Response 패턴
채널은 일반적으로 단방향 데이터 전달을 하지만, 채널을 이중으로 사용하면 응답을 받을 수 있는데요.
예를 들어, 처리 결과의 error
를 받고 싶다면 chan chan error
를 사용할 수 있습니다.
reqc := make(chan chan error)
요청을 보내는 쪽에서는 chan error
를 만들어서 chan chan
에 전송합니다.
이 채널로 결과가 돌아오며, 결과가 반환될 때까지 블록됩니다.
ch := make(chan error)
reqc <- ch // 요청 전송!!
err := <-ch // 결과 대기
요청을 받는 쪽에서는 받은 채널에 처리 결과(error
)를 반환합니다.
// 예를 들어 for-select 루프에서
for {
select {
case ch := <-reqc:
err := doSomething()
ch <- err // error 반환
}
}
샘플 코드
package main
import (
"fmt"
"log"
"time"
)
type Daemon struct {
reqc chan chan error
}
func (d *Daemon) StartLoop() {
go func() {
for {
select {
case ch := <-d.reqc:
err := doSomething()
ch <- err // error 반환
}
}
}()
}
func (d *Daemon) DoSomething() error {
ch := make(chan error)
d.reqc <- ch // 요청 전송!!
if err := <-ch; err != nil { // 결과 대기
return err
}
return nil
}
func doSomething() error {
// 어떤 작업을 수행합니다.
time.Sleep(1 * time.Second)
return nil
}
func main() {
d := &Daemon{
reqc: make(chan chan error),
}
d.StartLoop()
if err := d.DoSomething(); err != nil {
log.Fatal(err)
}
fmt.Println("done")
}
사용 예 2: 처리 완료 대기
Request/Response와 유사하지만, 단순히 처리 완료를 대기하는 데 사용할 수 있습니다.
GitHub의 오픈 소스를 살펴보니 이 용례가 비교적 많았습니다.
결과를 반환할 필요가 없으므로 chan chan
은 빈 구조체 타입으로 준비합니다.
reqc := make(chan chan struct{})
요청을 보내는 쪽에서는 chan struct{}
를 만들어서 chan chan
에 전송합니다.
처리 완료는 close
로 통지되므로 채널 수신으로 이를 기다립니다.
ch := make(chan struct{})
reqc <- ch // 요청 전송!!
<-ch // 처리 완료 대기 (채널이 close되면 재개)
요청을 받는 쪽에서는 처리가 완료되면 받은 채널을 close
합니다.
for {
select {
case ch := <-reqc:
doSomething()
close(ch) // 처리 완료를 알리기 위한 close
}
}
샘플 코드
package main
import (
"fmt"
"time"
)
type Daemon struct {
reqc chan chan struct{}
}
func (d *Daemon) StartLoop() {
go func() {
for {
select {
case ch := <-d.reqc:
doSomething()
close(ch) // 처리 완료를 알리기 위한 close
}
}
}()
}
func (d *Daemon) DoSomething() {
ch := make(chan struct{})
d.reqc <- ch
<-ch // 처리 완료 대기
}
func doSomething() {
// 어떤 작업을 수행합니다.
time.Sleep(1 * time.Second)
}
func main() {
d := &Daemon{
reqc: make(chan chan struct{}),
}
d.StartLoop()
d.DoSomething()
fmt.Println("done")
}
사용 예 3: Subscriber 등록
어떤 의미에서는 자연스러운 용도인데요, Publisher/Subscriber 패턴에서 Subscriber(채널)를 등록하는 데 사용합니다.
샘플 코드
package main
import (
"fmt"
"time"
)
type PubSub struct {
subscribe chan chan string
unsubscribe chan chan string
publish chan string
}
func (ps *PubSub) Subscribe(sub chan string) {
ps.subscribe <- sub
}
func (ps *PubSub) Unsubscribe(sub chan string) {
ps.unsubscribe <- sub
}
func (ps *PubSub) Publish(msg string) {
ps.publish <- msg
}
func (ps *PubSub) Start() {
go func() {
subscribers := make(map[chan string]struct{})
for {
select {
case ch := <-ps.subscribe:
subscribers[ch] = struct{}{}
case ch := <-ps.unsubscribe:
delete(subscribers, ch)
case msg := <-ps.publish:
for sub := range subscribers {
select {
case sub <- msg:
default:
}
}
}
}
}()
}
func main() {
ps := &PubSub{
subscribe: make(chan chan string),
unsubscribe: make(chan chan string),
publish: make(chan string),
}
ps.Start()
sub := make(chan string)
ps.Subscribe(sub)
ps.Publish("Hello, World!")
select {
case msg := <-sub:
fmt.Println(msg)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
}
사용 예 4: 순서 보장
사실 개인적으로 가장 자주 사용하는 용례인데요.
고루틴으로 처리를 고속화하면서도 순서를 보장하고 싶을 때 chan chan
을 이용합니다.
예를 들어 큰 파일을 읽으면서 한 줄씩 어떤 패치(DB 액세스, RPC 등)를 수행하고 가공하는 파이프라인 처리를 생각해볼까요.
아래는 chan chan
을 적용하기 전의 코드입니다.
package main
import (
"fmt"
"math/rand"
"time"
)
// 파일에서 한 줄씩 읽어옵니다
func readSomething() <-chan string {
outCh := make(chan string)
go func() {
defer close(outCh)
for i := 0; i < 1000; i++ {
// 파일을 읽는 대신 Sleep
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
outCh <- fmt.Sprintf("line:%d", i)
}
}()
return outCh
}
// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
outCh := make(chan string)
go func() {
defer close(outCh)
for line := range inCh {
// 패치하는 대신 Sleep
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
outCh <- fmt.Sprintf("%s ... fetched!", line)
}
}()
return outCh
}
func main() {
start := time.Now()
for line := range fetchSomething(readSomething()) {
fmt.Println(line)
}
fmt.Println("done", time.Since(start))
}
실행 결과
line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
...
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 49.956134795s
고루틴을 사용하고 있지만 거의 직렬 처리라서 시간이 많이 걸립니다.
그래서 패치 처리를 고루틴으로 병렬 처리하여 고속화를 시도해볼게요.
아래는 chan chan
이 아직 등장하지 않은 코드입니다.
// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
outCh := make(chan string)
sem := make(chan struct{}, 10)
go func() {
defer close(outCh)
for line := range inCh {
sem <- struct{}{}
go func(line string) {
defer func() { <-sem }()
// 패치하는 대신 Sleep
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
outCh <- fmt.Sprintf("%s ... fetched!", line)
}(line)
}
// 세마포어가 모두 해제될 때까지 대기
for i := 0; i < cap(sem); i++ {
sem <- struct{}{}
}
close(sem)
}()
return outCh
}
무조건 모두 고루틴으로 돌리면 매우 빨라지지만, 입력 파일이 거대해지면 그만큼 메모리 등의 리소스를 소비하므로 세마포어를 사용하여 동시 실행 수를 10으로 제어합니다.
실행 결과
line:3 ... fetched!
line:8 ... fetched!
line:7 ... fetched!
line:2 ... fetched!
...
line:995 ... fetched!
line:996 ... fetched!
line:993 ... fetched!
line:999 ... fetched!
done 5.059474157s
꽤 빨라졌습니다!
하지만 잘 보면 결과의 순서가 뒤바뀌었는데요.
이는 패치를 위한 여러 고루틴이 순서와 상관없이 결과를 출력 채널에 쓰기 때문입니다.
출력 순서가 상관없다면 이대로도 괜찮지만, 때로는 출력 순서를 입력과 맞추고 싶을 때가 있죠.
그럴 때 chan chan
의 등장입니다!
// 패치 & 가공
func fetchSomething(inCh <-chan string) <-chan string {
outCh := make(chan string)
outChCh := make(chan chan string, 10)
go func() {
defer close(outChCh)
for line := range inCh {
ch := make(chan string)
outChCh <- ch
go func(line string, ch chan string) {
// 패치하는 대신 Sleep
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
ch <- fmt.Sprintf("%s ... fetched!", line)
close(ch)
}(line, ch)
}
}()
go func() {
defer close(outCh)
for ch := range outChCh {
outCh <- <-ch
}
}()
return outCh
}
출력용 버퍼가 있는 chan chan
을 생성하고, 상류 채널에서 데이터를 받으면 먼저 패치 처리 출력용 채널을 생성하고 이를 출력용 chan chan
에 전송합니다.
그리고 고루틴으로 패치 처리를 수행하고 결과를 해당 채널에 전송합니다.
이 방식으로 입력된 순서대로 출력이 가능합니다.
실행 결과
line:0 ... fetched!
line:1 ... fetched!
line:2 ... fetched!
line:3 ... fetched!
...
line:996 ... fetched!
line:997 ... fetched!
line:998 ... fetched!
line:999 ... fetched!
done 7.275727182s
순서가 정상적으로 맞았습니다!
순서 무관 버전보다 약간 처리량이 떨어지지만, 이는 어쩔 수 없는 부분입니다.
chan chan
의 버퍼 크기로 처리량과 메모리 리소스의 트레이드오프를 조정할 수 있습니다.
마무리하며
이렇게 chan chan
의 멋진 사용 예들을 소개해드렸는데요.
Go 언어에서 채널의 채널을 활용하면 보다 유연하고 강력한 동시성 패턴을 구현할 수 있습니다.
여러분도 한 번 시도해보시면 어떨까요?
'Go' 카테고리의 다른 글
Go 언어에서 구조체 필드를 반드시 지정하여 초기화하는 방법 알아볼까요? (0) | 2024.09.19 |
---|---|
Go 언어 time.Timer#Reset() 완벽 가이드: 올바른 사용법 알아볼까요? (0) | 2024.09.19 |
Go 실행 파일에 ZIP으로 리소스 임베딩하기: 간단하게 알아볼까요? (0) | 2024.09.19 |
Go 언어 fmt.Printf 완벽 가이드 (0) | 2024.09.19 |
Go로 CUI 툴을 쉽고 편하게! gocui 사용기 (0) | 2024.09.19 |