bagaimana cara mendengarkan saluran N? (pernyataan pemilihan dinamis)

116

untuk memulai loop tanpa akhir dalam mengeksekusi dua goroutine, saya dapat menggunakan kode di bawah ini:

setelah menerima pesan, itu akan memulai goroutine baru dan berlanjut selamanya.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)

for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}

Sekarang saya ingin memiliki perilaku yang sama untuk N goroutine, tetapi bagaimana pernyataan select akan terlihat dalam kasus itu?

Ini adalah bit kode yang saya mulai, tetapi saya bingung bagaimana membuat kode pernyataan pilih

numChans := 2

//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}

for i:=0;i<numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := <-c1:
        fmt.Println("received ", msg1)
        go DoStuff(c1, 1)
    case msg2 := <-c2:
        fmt.Println("received ", msg2)
        go DoStuff(c2, 9)
    }
}
John Smith
sumber
4
Saya rasa yang Anda inginkan adalah Multiplexing Saluran. golang.org/doc/effective_go.html#chan_of_chan Pada dasarnya, Anda memiliki satu saluran tunggal yang Anda dengarkan dan kemudian beberapa saluran anak yang menyalurkan ke saluran utama. Pertanyaan SO Terkait: stackoverflow.com/questions/10979608/…
Brenden

Jawaban:

152

Anda dapat melakukan ini menggunakan Selectfungsi dari paket refleksi :

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Select mengeksekusi operasi pemilihan yang dijelaskan oleh daftar kasus. Seperti pernyataan pilih Go, ia memblokir hingga setidaknya satu kasus dapat dilanjutkan, membuat pilihan pseudo-random yang seragam, dan kemudian menjalankan kasus tersebut. Ini mengembalikan indeks dari kasus yang dipilih dan, jika kasus itu adalah operasi penerimaan, nilai yang diterima dan boolean yang menunjukkan apakah nilai tersebut sesuai dengan pengiriman pada saluran (sebagai lawan dari nilai nol yang diterima karena saluran ditutup).

Anda meneruskan larik SelectCasestruct yang mengidentifikasi saluran untuk dipilih, arah operasi, dan nilai untuk dikirim dalam kasus operasi pengiriman.

Jadi Anda bisa melakukan sesuatu seperti ini:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

Anda dapat bereksperimen dengan contoh yang lebih lengkap di sini: http://play.golang.org/p/8zwvSk4kjx

James Henstridge
sumber
4
Apakah ada batasan praktis untuk jumlah kasus dalam pemilihan tersebut? Salah satu yang jika Anda melampaui itu, maka kinerja sangat terpengaruh?
Maxim Vladimirsky
4
Mungkin itu ketidakmampuan saya, tetapi saya menemukan pola ini sangat sulit untuk dikerjakan ketika Anda mengirim & menerima struktur yang kompleks melalui saluran. Meneruskan saluran "agregat" bersama, seperti yang dikatakan Tim Allclair, jauh lebih mudah dalam kasus saya.
Bora M. Alper
90

Anda dapat melakukannya dengan menggabungkan setiap saluran dalam goroutine yang "meneruskan" pesan ke saluran "agregat" bersama. Sebagai contoh:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg <- agg:
    fmt.Println("received ", msg)
}

Jika Anda perlu mengetahui dari saluran mana pesan berasal, Anda dapat menggabungkannya dalam struct dengan informasi tambahan sebelum meneruskannya ke saluran agregat.

Dalam pengujian (terbatas) saya, metode ini bekerja sangat baik dengan menggunakan paket refleks:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect         1    5265109013 ns/op
BenchmarkGoSelect             20      81911344 ns/op
ok      command-line-arguments  9.463s

Kode benchmark di sini

Tim Allclair
sumber
2
Kode patokan Anda salah, Anda harus mengulangb.N dalam patokan. Jika tidak, hasil (yang dibagi dengan b.N, 1 dan 2000000000 dalam output Anda) tidak akan berarti sama sekali.
Dave C
2
@DaveC Terima kasih! Kesimpulannya tidak berubah, tapi hasilnya jauh lebih masuk akal.
Tim Allclair
1
Memang, saya melakukan retasan cepat pada kode patokan Anda untuk mendapatkan angka sebenarnya . Mungkin ada sesuatu yang masih hilang / salah dari tolok ukur ini tetapi satu-satunya hal yang lebih rumit mencerminkan kode yang terjadi adalah bahwa penyiapannya lebih cepat (dengan GOMAXPROCS = 1) karena tidak memerlukan banyak goroutine. Dalam setiap kasus lain, saluran penggabungan goroutine sederhana menghancurkan solusi pantulan (dengan ~ 2 lipat lipat).
Dave C
2
Salah satu kelemahan penting (dibandingkan dengan reflect.Selectpendekatannya) adalah goroutine melakukan buffer penggabungan minimal satu nilai pada setiap saluran yang akan digabungkan. Biasanya itu tidak akan menjadi masalah tetapi dalam beberapa aplikasi tertentu yang mungkin menjadi pemecah masalah :(.
Dave C
1
saluran gabungan buffer membuat masalah menjadi lebih buruk. Masalahnya adalah bahwa hanya solusi refleksi yang dapat memiliki semantik sepenuhnya tanpa buffer. Saya telah melanjutkan dan memposting kode tes yang saya coba sebagai jawaban terpisah untuk (semoga) mengklarifikasi apa yang ingin saya katakan.
Dave C
22

Untuk memperluas beberapa komentar pada jawaban sebelumnya dan untuk memberikan perbandingan yang lebih jelas, berikut adalah contoh dari kedua pendekatan yang disajikan sejauh ini dengan masukan yang sama, sepotong saluran untuk dibaca dan fungsi untuk memanggil setiap nilai yang juga perlu diketahui yang mana saluran asal nilai.

Ada tiga perbedaan utama antara pendekatan tersebut:

  • Kompleksitas. Meskipun ini mungkin sebagian dari preferensi pembaca, saya menemukan pendekatan saluran lebih idiomatik, langsung, dan mudah dibaca.

  • Performa. Pada sistem Xeon amd64 saya, saluran keluar goroutines + melakukan solusi refleksi sekitar dua kali lipat (secara umum refleksi di Go seringkali lebih lambat dan hanya boleh digunakan ketika benar-benar diperlukan). Tentu saja, jika ada penundaan yang signifikan baik dalam fungsi pemrosesan hasil atau dalam penulisan nilai ke saluran input, perbedaan kinerja ini dapat dengan mudah menjadi tidak signifikan.

  • Semantik pemblokiran / buffering. Pentingnya ini tergantung pada kasus penggunaan. Seringkali itu tidak masalah atau sedikit buffering ekstra dalam solusi penggabungan goroutine mungkin berguna untuk throughput. Namun, jika diinginkan untuk memiliki semantik yang hanya satu penulis yang tidak diblokir dan nilainya ditangani sepenuhnya sebelum penulis lain dibuka blokirnya, maka itu hanya dapat dicapai dengan solusi reflektif.

Catatan, kedua pendekatan dapat disederhanakan jika "id" dari saluran pengirim tidak diperlukan atau jika saluran sumber tidak akan pernah ditutup.

Saluran penggabungan Goroutine:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
    // Setup
    type item struct {
        int    // index of which channel this came from
        string // the actual string item
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // Reads and buffers a single item from `c` before
            // we even know if we can write to `merged`.
            //
            // Go doesn't provide a way to do something like:
            //     merged <- (<-c)
            // atomically, where we delay the read from `c`
            // until we can write to `merged`. The read from
            // `c` will always happen first (blocking as
            // required) and then we block on `merged` (with
            // either the above or the below syntax making
            // no difference).
            for s := range c {
                merged <- item{i, s}
            }
            // If/when this input channel is closed we just stop
            // writing to the merged channel and via the WaitGroup
            // let it be known there is one fewer channel active.
            wg.Done()
        }(i, c)
    }
    // One extra goroutine to watch for all the merging goroutines to
    // be finished and then close the merged channel.
    go func() {
        wg.Wait()
        close(merged)
    }()

    // "select-like" loop
    for i := range merged {
        // Process each value
        fn(i.int, i.string)
    }
}

Pilihan refleksi:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
    // Setup
    cases := make([]reflect.SelectCase, len(chans))
    // `ids` maps the index within cases to the original `chans` index.
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // Select loop
    for len(cases) > 0 {
        // A difference here from the merging goroutines is
        // that `v` is the only value "in-flight" that any of
        // the workers have sent. All other workers are blocked
        // trying to send the single value they have calculated
        // where-as the goroutine version reads/buffers a single
        // extra value from each worker.
        i, v, ok := reflect.Select(cases)
        if !ok {
            // Channel cases[i] has been closed, remove it
            // from our slice of cases and update our ids
            // mapping as well.
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // Process each value
        fn(ids[i], v.String())
    }
}

[Kode lengkap di taman bermain Go .]

Dave C
sumber
1
Perlu juga dicatat bahwa solusi saluran + goroutine tidak dapat melakukan semuanya selectatau reflect.Selecttidak. Goroutine akan terus berputar sampai mereka mengkonsumsi semuanya dari saluran, jadi tidak ada cara yang jelas untuk Process1keluar lebih awal. Ada juga potensi masalah jika Anda memiliki banyak pembaca, karena goroutine menyangga satu item dari setiap saluran, yang tidak akan terjadi dengan select.
James Henstridge
@JamesHenstridge, catatan pertama Anda tentang berhenti tidak benar. Anda akan mengatur untuk menghentikan Process1 dengan cara yang sama seperti Anda mengatur untuk menghentikan Process2; misalnya dengan menambahkan saluran "stop" yang ditutup saat goroutine harus berhenti. Process1 akan membutuhkan dua kasus selectdalam satu forloop, bukan for rangeloop sederhana yang saat ini digunakan. Process2 perlu memasukkan kasus lain ke dalam casesdan menangani khusus nilai itu i.
Dave C
Itu masih belum menyelesaikan masalah bahwa Anda membaca nilai dari saluran yang tidak akan digunakan dalam kasus awal berhenti.
James Henstridge
0

Mengapa pendekatan ini tidak berhasil dengan asumsi bahwa seseorang mengirim peristiwa?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x = <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default: continue
            }
        }
    }
}
noonex
sumber
8
Ini adalah putaran-putaran. Sambil menunggu saluran input memiliki nilai, ini menghabiskan semua CPU yang tersedia. Inti dari selectbeberapa saluran (tanpa defaultklausa) adalah bahwa ia menunggu secara efisien hingga setidaknya satu saluran siap tanpa berputar.
Dave C
0

Opsi yang mungkin lebih sederhana:

Alih-alih memiliki array saluran, mengapa tidak meneruskan hanya satu saluran sebagai parameter ke fungsi yang dijalankan pada goroutine terpisah, dan kemudian mendengarkan saluran tersebut dalam goroutine konsumen?

Ini memungkinkan Anda untuk memilih hanya pada satu saluran di pendengar Anda, membuat pemilihan sederhana, dan menghindari pembuatan goroutine baru untuk mengumpulkan pesan dari beberapa saluran?

Fernando Sanchez
sumber