Bagaimana saya bisa membuat konstruksi universal lebih efisien?

16

"Konstruksi universal" adalah kelas pembungkus untuk objek berurutan yang memungkinkannya untuk linierisasi (kondisi konsistensi yang kuat untuk objek bersamaan). Misalnya, berikut ini adalah konstruksi bebas-tunggu yang disesuaikan, di Jawa, dari [1], yang mengasumsikan keberadaan antrian tunggu-bebas yang memenuhi antarmuka WFQ(yang hanya membutuhkan konsensus satu kali antar thread) dan mengasumsikan Sequentialantarmuka:

public interface WFQ<T> // "FIFO" iteration
{
    int enqueue(T t); // returns the sequence number of t
    Iterable<T> iterateUntil(int max); // iterates until sequence max
}
public interface Sequential
{
    // Apply an invocation (method + arguments)
    // and get a response (return value + state)
    Response apply(Invocation i); 
}
public interface Factory<T> { T generate(); } // generate new default object
public interface Universal extends Sequential {}

public class SlowUniversal implements Universal
{
    Factory<? extends Sequential> generator;
    WFQ<Invocation> wfq = new WFQ<Invocation>();
    Universal(Factory<? extends Sequential> g) { generator = g; } 
    public Response apply(Invocation i)
    {
        int max = wfq.enqueue(i);
        Sequential s = generator.generate();
        for(Invocation invoc : wfq.iterateUntil(max))
            s.apply(invoc);
        return s.apply(i);
    }
}

Implementasi ini tidak terlalu memuaskan karena sangat lambat (Anda ingat setiap permintaan, dan harus mengulangnya di setiap penerapan - kami memiliki runtime linier dalam ukuran histori). Apakah ada cara yang kita bisa memperpanjang WFQdan Sequentialantarmuka (dengan cara yang masuk akal) untuk memungkinkan kita untuk menyimpan beberapa langkah ketika menerapkan doa baru?

Bisakah kita menjadikan ini lebih efisien (bukan runtime linier dalam ukuran histori, lebih disukai penggunaan memori juga turun) tanpa kehilangan properti bebas-tunggu?

Klarifikasi

"Konstruksi universal" adalah istilah yang saya cukup yakin dibuat oleh [1] yang menerima objek yang tidak aman tetapi kompatibel dengan utas, yang digeneralisasikan oleh Sequentialantarmuka. Menggunakan antrian tunggu-bebas, konstruksi pertama menawarkan thread-safe, linearizable dan juga bebas-tunggu (ini mengasumsikan determinisme dan menghentikan applyoperasi).

Ini tidak efisien, karena metode ini efektif untuk membuat setiap utas lokal mulai dari yang bersih dan menerapkan setiap operasi yang pernah direkam. Bagaimanapun, ini bekerja karena ini mencapai sinkronisasi secara efektif dengan menggunakan WFQuntuk menentukan urutan di mana semua operasi harus diterapkan: setiap panggilan thread applyakan melihat lokal yang samaSequential objek , dengan urutan Invocations yang sama diterapkan untuk itu.

Pertanyaan saya adalah apakah kita dapat (misalnya) memperkenalkan proses pembersihan latar belakang yang memperbarui "keadaan awal" sehingga kita tidak perlu memulai ulang dari awal. Ini tidak sesederhana memiliki penunjuk atomik dengan penunjuk awal - pendekatan semacam ini dengan mudah kehilangan jaminan tunggu-bebas. Kecurigaan saya adalah bahwa beberapa pendekatan berbasis antrian mungkin bekerja di sini.

Jargon:

  1. tunggu-bebas - terlepas dari jumlah utas atau pengambilan keputusan penjadwal, applyakan berakhir dalam jumlah instruksi yang terbukti terbatas yang dieksekusi untuk utas itu.
  2. kunci-bebas - sama seperti di atas, tetapi mengakui kemungkinan waktu eksekusi tanpa batas, hanya dalam kasus bahwa sejumlah applyoperasi tanpa batas dilakukan di utas lainnya. Biasanya, skema sinkronisasi optimis termasuk dalam kategori ini.
  3. pemblokiran - efisiensi karena belas kasihan penjadwal.

Contoh yang berfungsi, seperti yang diminta (sekarang pada halaman yang tidak akan kedaluwarsa)

[1] Herlihy dan Shavit, Seni Pemrograman Multi-Prosesor .

VF1
sumber
Pertanyaan 1 hanya dapat dijawab jika kami tahu apa artinya "berhasil" bagi Anda.
Robert Harvey
@RobertHarvey Saya memperbaikinya - semua yang diperlukan untuk "bekerja" adalah agar pembungkusnya menjadi bebas menunggu dan semua operasi CopyableSequentialharus valid - linierabilitas kemudian harus mengikuti dari fakta bahwa itu Sequential.
VF1
Ada banyak kata yang bermakna dalam pertanyaan ini, tetapi saya berjuang untuk menyatukannya untuk memahami dengan tepat apa yang ingin Anda capai. Bisakah Anda memberikan beberapa penjelasan tentang masalah apa yang Anda coba selesaikan dan mungkin sedikit mengurangi jargonnya?
JimmyJames
@JimmyJames Saya telah menguraikan "komentar panjang" di dalam pertanyaan. Tolong beri tahu saya jika ada Jargon lain yang perlu dihapus.
VF1
dalam paragraf pertama dari komentar Anda mengatakan "objek thread-tidak aman tetapi kompatibel dengan thread" dan "versi objek yang dapat di-linearizable". Tidak jelas apa yang Anda maksud dengan itu karena thread-safe dan linearizable hanya benar-benar relevan dengan instruksi yang dapat dieksekusi tetapi Anda menggunakannya untuk menggambarkan objek, yang merupakan data. Saya berasumsi bahwa Invokasi (yang tidak didefinisikan) secara efektif adalah pointer metode dan metode yang tidak aman untuk thread. Saya tidak tahu apa artinya thread-kompatibel .
JimmyJames

Jawaban:

1

Berikut ini penjelasan dan contoh bagaimana hal ini dilakukan. Beri tahu saya jika ada bagian yang tidak jelas.

Intinya dengan sumber

Universal

Inisialisasi:

Indeks thread diterapkan dengan cara yang bertambah secara atom. Ini dikelola menggunakan AtomicIntegernama nextIndex. Indeks-indeks ini ditugaskan untuk utas melalui sebuah ThreadLocalinstance yang menginisialisasi dirinya dengan mendapatkan indeks berikutnya dari nextIndexdan menambahnya. Ini terjadi saat pertama kali setiap indeks utas diambil pertama kali. A ThreadLocaldibuat untuk melacak urutan terakhir yang dibuat utas ini. Ini diinisialisasi 0. Referensi objek pabrik berurutan dilewatkan dan disimpan. Dua AtomicReferenceArraycontoh dibuat dari ukuran n. Objek ekor ditugaskan untuk setiap referensi, yang telah diinisialisasi dengan keadaan awal yang disediakan oleh Sequentialpabrik. nadalah jumlah maksimum utas yang diizinkan. Setiap elemen dalam array ini 'milik' indeks utas terkait.

Terapkan metode:

Ini adalah metode yang melakukan pekerjaan yang menarik. Ini melakukan hal berikut:

  • Buat simpul baru untuk doa ini: milikku
  • Setel simpul baru ini di array yang diumumkan di indeks utas saat ini

Kemudian loop pengurutan dimulai. Itu akan berlanjut sampai doa saat ini telah diurutkan:

  1. temukan sebuah simpul dalam array yang diumumkan menggunakan urutan simpul terakhir yang dibuat oleh utas ini. Lebih lanjut tentang ini nanti.
  2. jika sebuah node ditemukan pada langkah 2 itu belum diurutkan, lanjutkan dengan itu, jika tidak, hanya fokus pada doa saat ini. Ini hanya akan mencoba membantu satu simpul lainnya per doa.
  3. Apa pun simpul yang dipilih pada langkah 3, teruslah mencoba untuk mengurutkannya setelah simpul berurutan terakhir (utas lainnya dapat mengganggu.) Terlepas dari keberhasilan, setel referensi kepala utas saat ini ke urutan yang dikembalikan oleh decideNext()

Kunci untuk loop bersarang yang dijelaskan di atas adalah decideNext()metode. Untuk memahami itu, kita perlu melihat kelas Node.

Kelas simpul

Kelas ini menentukan node dalam daftar yang ditautkan dua kali lipat. Tidak banyak aksi di kelas ini. Sebagian besar metode adalah metode pengambilan sederhana yang harus cukup jelas.

metode ekor

ini mengembalikan instance simpul khusus dengan urutan 0. Ini hanya bertindak sebagai tempat-pemegang sampai doa menggantikannya.

Properti dan inisialisasi

  • seq: nomor urut, diinisialisasi ke -1 (artinya tidak diikuti)
  • invocation: nilai doa apply(). Ditetapkan pada konstruksi.
  • next: AtomicReferenceuntuk tautan penerusan. sekali ditugaskan, ini tidak akan pernah berubah
  • previous: AtomicReferenceuntuk tautan terbelakang yang ditugaskan pada pengurutan dan dihapus olehtruncate()

Putuskan Selanjutnya

Metode ini hanya satu di Node dengan logika non-trivial. Singkatnya, sebuah simpul ditawarkan sebagai kandidat untuk menjadi simpul berikutnya dalam daftar tertaut. The compareAndSet()Metode akan memeriksa apakah referensi itu adalah null dan jika demikian, mengatur referensi untuk calon. Jika referensi sudah diset, itu tidak melakukan apa-apa. Operasi ini bersifat atomik sehingga jika dua kandidat ditawarkan pada saat yang sama, hanya satu yang akan dipilih. Ini menjamin hanya satu simpul yang akan dipilih sebagai yang berikutnya. Jika kandidat node dipilih, urutannya diatur ke nilai berikutnya, dan tautan sebelumnya diatur ke simpul ini.

Melompat Kembali ke metode berlaku kelas Universal ...

Setelah memanggil decideNext()simpul sekuens terakhir (saat dicentang) dengan simpul kami atau simpul dari announcearray, ada dua kemungkinan kejadian: 1. Node berhasil diurutkan 2. Beberapa utas lain mencegah utas ini.

Langkah selanjutnya adalah memeriksa apakah simpul dibuat untuk permohonan ini. Ini bisa terjadi karena utas ini berhasil mengurutkannya atau beberapa utas lainnya mengambilnya dari announcearray dan mengurutkannya untuk kami. Jika belum diurutkan, prosesnya diulang. Kalau tidak, panggilan selesai dengan menghapus array yang diumumkan di indeks utas ini dan mengembalikan nilai hasil dari doa. Array announce dihapus untuk menjamin tidak ada referensi ke node yang tersisa yang akan mencegah node dari pengumpulan sampah dan oleh karena itu menjaga semua node dalam daftar tertaut mulai saat itu tetap hidup di heap.

Evaluasi metode

Sekarang setelah simpul doa berhasil diurutkan, doa perlu dievaluasi. Untuk melakukan itu, langkah pertama adalah memastikan bahwa doa sebelum ini telah dievaluasi. Jika mereka tidak memiliki utas ini tidak akan menunggu tetapi akan segera bekerja.

Metode EnsurePrior

The ensurePrior()Metode ini bekerja dengan memeriksa node sebelumnya dalam linked list. Jika statusnya tidak disetel, simpul sebelumnya akan dievaluasi. Node yang ini bersifat rekursif. Jika node sebelum node sebelumnya belum dievaluasi, itu akan memanggil evaluasi untuk node itu dan seterusnya.

Sekarang node sebelumnya diketahui memiliki status, kita dapat mengevaluasi node ini. Node terakhir diambil dan ditugaskan ke variabel lokal. Jika referensi ini adalah nol, itu berarti bahwa beberapa utas lain telah lebih dulu dari yang ini dan sudah mengevaluasi simpul ini; pengaturan negara itu. Jika tidak, keadaan simpul sebelumnya diteruskan ke metode Sequentialobjek yang diterapkan bersama dengan pemanggilan simpul ini. Keadaan dikembalikan diatur pada node dan truncate()metode ini disebut, membersihkan tautan mundur dari node karena tidak lagi diperlukan.

Metode MoveForward

Metode maju akan mencoba untuk memindahkan semua referensi kepala ke simpul ini jika mereka belum menunjuk ke sesuatu yang lebih jauh. Ini untuk memastikan bahwa jika utas berhenti memanggil, kepalanya tidak akan mempertahankan referensi ke simpul yang tidak lagi diperlukan. The compareAndSet()metode akan memastikan bahwa kami hanya memperbarui node jika beberapa thread lain tidak berubah sejak itu diambil.

Umumkan susunan dan bantuan

Kunci untuk membuat pendekatan ini bebas-tunggu dan bukan hanya bebas-penguncian adalah bahwa kita tidak dapat berasumsi bahwa penjadwal thread akan memberikan prioritas setiap thread saat dibutuhkan. Jika setiap utas hanya mencoba mengurutkan simpul-simpulnya sendiri, ada kemungkinan bahwa utas dapat terus-menerus dipersiapkan sebelumnya di bawah beban. Untuk memperhitungkan kemungkinan ini, setiap utas pertama-tama akan mencoba 'membantu' utas lainnya yang mungkin tidak dapat diurutkan.

Ide dasarnya adalah bahwa setiap thread berhasil membuat node, urutan yang diberikan meningkat secara monoton. Jika utas atau utas terus-menerus mencegah utas lainnya, indeks penggunaan untuk menemukan simpul yang tidak diikuti dalam announcelarik akan bergerak maju. Bahkan jika setiap utas yang saat ini mencoba untuk mengurutkan simpul yang diberikan secara terus-menerus diawali oleh utas lain, akhirnya semua utas akan mencoba mengurutkan simpul itu. Sebagai ilustrasi, kami akan membuat contoh dengan tiga utas.

Pada titik awal, kepala ketiga elemen dan mengumumkan elemen diarahkan ke tailnode. Untuk lastSequencesetiap utas adalah 0.

Pada titik ini, Thread 1 dijalankan dengan doa. Ia memeriksa array yang mengumumkan untuk urutan terakhir (nol) yang merupakan simpul yang saat ini dijadwalkan untuk diindeks. Ini mengurutkan node dan lastSequencediatur ke 1.

Thread 2 sekarang dijalankan dengan doa, itu memeriksa array yang mengumumkan di urutan terakhirnya (nol) dan melihat bahwa itu tidak membutuhkan bantuan dan upaya untuk mengurutkan doa itu. Berhasil dan sekarang lastSequencediatur ke 2.

Thread 3 sekarang dieksekusi dan ia juga melihat bahwa simpul di announce[0]sudah diurutkan dan urutan doa itu sendiri. Ini lastSequencesekarang sudah siap untuk 3.

Sekarang Thread 1 dipanggil lagi. Itu memeriksa array mengumumkan di indeks 1 dan menemukan bahwa itu sudah diurutkan. Bersamaan, Thread 2 dipanggil. Itu memeriksa array mengumumkan di indeks 2 dan menemukan bahwa itu sudah diurutkan. Baik Thread 1 dan Thread 2 sekarang mencoba untuk mengurutkan node mereka sendiri. Thread 2 menang dan mengurutkan permohonannya. Sudah lastSequencediatur ke 4. Sementara itu, utas tiga telah dipanggil. Ia memeriksa indeksnya lastSequence(mod 3) dan menemukan bahwa simpul di announce[0]belum diurutkan. Utas 2 kembali dipanggil pada saat yang sama dengan Utas 1 pada upaya kedua. Utas 1menemukan permohonan announce[1]yang tidak diikuti di mana merupakan simpul yang baru saja dibuat oleh Thread 2 . Ia mencoba untuk mengurutkan permintaan Thread 2 dan berhasil. Thread 2 menemukan simpul itu sendiri di announce[1]dan telah diurutkan. Ini diatur lastSequenceke 5. Thread 3 kemudian dipanggil dan menemukan bahwa simpul yang ditempatkan 1 thread announce[0]masih belum diurutkan dan upaya untuk melakukannya. Sementara itu Thread 2 juga telah dipanggil dan lebih dulu Thread 3. Urutan itu simpul dan set lastSequenceke 6.

Thread Buruk 1 . Meskipun Thread 3 sedang mencoba untuk mengurutkannya, kedua utas telah terus-menerus digagalkan oleh penjadwal. Tetapi pada titik ini. Thread 2 juga sekarang menunjuk ke announce[0](6 mod 3). Ketiga utas diatur untuk mencoba mengurutkan permohonan yang sama. Tidak peduli thread mana yang berhasil, simpul berikutnya yang akan diurutkan akan menjadi doa menunggu Thread 1 yaitu simpul yang dirujuk oleh announce[0].

Ini tidak bisa dihindari. Agar utas menjadi pre-empted, utas lainnya harus mengurutkan node dan saat mereka melakukannya, mereka akan terus bergerak lastSequencemaju. Jika simpul utas yang diberikan terus-menerus tidak diurutkan, akhirnya semua utas akan menunjuk ke indeksnya dalam array yang diumumkan. Tidak ada utas yang akan melakukan hal lain sampai simpul yang ia coba bantu telah diurutkan, skenario kasus terburuk adalah bahwa semua utas menunjuk ke simpul yang tidak diikuti yang sama. Oleh karena itu, waktu yang diperlukan untuk mengurutkan setiap doa adalah fungsi dari jumlah utas dan bukan ukuran input.

JimmyJames
sumber
Maukah Anda menaruh beberapa kutipan kode pada pastebin? Banyak hal (seperti daftar tertaut bebas kunci) dapat dengan sederhana dinyatakan seperti itu? Agak sulit untuk memahami jawaban Anda secara keseluruhan ketika ada begitu banyak detail. Bagaimanapun, ini terlihat menjanjikan, saya pasti ingin menggali ke dalam apa jaminan yang diberikannya.
VF1
Ini jelas tampak seperti implementasi bebas-kunci yang valid, tetapi tidak ada masalah mendasar yang saya khawatirkan. Persyaratan kelurusan linier mengharuskan adanya "riwayat yang valid" untuk hadir, yang, dalam hal penerapan daftar-tertaut, membutuhkan a previousdan nextpenunjuk menjadi valid. Mempertahankan dan membuat riwayat yang valid dengan cara bebas-tunggu tampaknya sulit.
VF1
@ VF1 Saya tidak yakin masalah apa yang tidak diatasi. Segala sesuatu yang Anda sebutkan di sisa komentar dibahas dalam contoh yang saya berikan, dari apa yang dapat saya katakan.
JimmyJames
Anda telah melepaskan properti bebas-tunggu .
VF1
@ VF1 Bagaimana menurut Anda?
JimmyJames
0

Jawaban saya sebelumnya tidak benar-benar menjawab pertanyaan dengan benar tetapi karena OP melihatnya berguna, saya akan membiarkannya apa adanya. Berdasarkan kode di tautan dalam pertanyaan, inilah upaya saya. Saya hanya melakukan pengujian yang sangat mendasar untuk hal ini tetapi tampaknya menghitung rata-rata dengan benar. Umpan balik disambut baik apakah ini benar bebas menunggu.

CATATAN : Saya menghapus antarmuka Universal dan menjadikannya kelas. Memiliki Universal terdiri dari Sequentials dan juga menjadi salah satu sepertinya komplikasi yang tidak perlu tapi saya mungkin kehilangan sesuatu. Di kelas rata-rata, saya telah menandai variabel keadaan menjadi volatile. Ini tidak perlu untuk membuat kode berfungsi. Menjadi konservatif (ide bagus dengan threading) dan mencegah setiap utas melakukan semua perhitungan (satu kali).

Berurutan & Pabrik

public interface Sequential<E, S, R>
{ 
  R apply(S priorState);

  S state();

  default boolean isApplied()
  {
    return state() != null;
  }
}

public interface Factory<E, S, R>
{
   S initial();

   Sequential<E, S, R> generate(E input);
}

Universal

import java.util.concurrent.ConcurrentLinkedQueue;

public class Universal<I, S, R> 
{
  private final Factory<I, S, R> generator;
  private final ConcurrentLinkedQueue<Sequential<I, S, R>> wfq = new ConcurrentLinkedQueue<>();
  private final ThreadLocal<Sequential<I, S, R>> last = new ThreadLocal<>();

  public Universal(Factory<I, S, R> g)
  { 
    generator = g;
  }

  public R apply(I invocation)
  {
    Sequential<I, S, R> newSequential = generator.generate(invocation);
    wfq.add(newSequential);

    Sequential<I, S, R> last = null;
    S prior = generator.initial(); 

    for (Sequential<I, S, R> i : wfq) {
      if (!i.isApplied() || newSequential == i) {
        R r = i.apply(prior);

        if (i == newSequential) {
          wfq.remove(last.get());
          last.set(newSequential);

          return r;
        }
      }

      prior = i.state();
    }

    throw new IllegalStateException("Houston, we have a problem");
  }
}

Rata-rata

public class Average implements Sequential<Integer, Average.State, Double>
{
  private final Integer invocation;
  private volatile State state;

  private Average(Integer invocation)
  {
    this.invocation = invocation;
  }

  @Override
  public Double apply(State prior)
  {
    System.out.println(Thread.currentThread() + " " + invocation + " prior " + prior);

    state = prior.add(invocation);

    return ((double) state.sum)/ state.count;
  }

  @Override
  public State state()
  {
    return state;
  }

  public static class AverageFactory implements Factory<Integer, State, Double> 
  {
    @Override
    public State initial()
    {
      return new State(0, 0);
    }

    @Override
    public Average generate(Integer i)
    {
      return new Average(i);
    }
  }

  public static class State
  {
    private final int sum;
    private final int count;

    private State(int sum, int count)
    {
      this.sum = sum;
      this.count = count;
    }

    State add(int value)
    {
      return new State(sum + value, count + 1);
    }

    @Override
    public String toString()
    {
      return sum + " / " + count;
    }
  }
}

Kode demo

private static final int THREADS = 10;
private static final int SIZE = 50;

public static void main(String... args)
{
  Average.AverageFactory factory = new Average.AverageFactory();

  Universal<Integer, Average.State, Double> universal = new Universal<>(factory);

  for (int i = 0; i < THREADS; i++)
  {
    new Thread(new Test(i * SIZE, universal)).start();
  }
}

static class Test implements Runnable
{
  final int start;
  final Universal<Integer, Average.State, Double> universal;

  Test(int start, Universal<Integer, Average.State, Double> universal)
  {
    this.start = start;
    this.universal = universal;
  }

  @Override
  public void run()
  {
    for (int i = start; i < start + SIZE; i++)
    {
      System.out.println(Thread.currentThread() + " " + i);

      System.out.println(System.nanoTime() + " " + Thread.currentThread() + " " + i + " result " + universal.apply(i));
    }
  }
}

Saya melakukan beberapa pengeditan pada kode ketika saya memposting di sini. Seharusnya tidak masalah, tetapi beri tahu saya jika Anda memiliki masalah dengannya.

JimmyJames
sumber
Anda tidak harus terus memberikan jawaban bagi saya (saya sebelumnya telah memperbarui pertanyaan saya untuk mendapatkan kesimpulan yang relevan untuk diambil darinya). Sayangnya, jawaban ini juga tidak menjawab pertanyaan, karena itu tidak benar-benar membebaskan memori di dalamnya wfq, jadi Anda masih harus menelusuri seluruh riwayat - runtime tidak membaik kecuali oleh faktor konstan.
VF1
@ Vf1 Waktu yang diperlukan untuk menelusuri seluruh daftar untuk memeriksa apakah telah dihitung akan sangat kecil dibandingkan dengan melakukan setiap perhitungan. Karena status sebelumnya tidak diperlukan, harus dimungkinkan untuk menghapus status awal. Pengujian sulit dan mungkin memerlukan menggunakan koleksi yang dikustomisasi tetapi saya telah menambahkan perubahan kecil.
JimmyJames
@ VF1 Diperbarui pada implementasi yang tampaknya bekerja dengan pengujian sepintas dasar. Saya tidak yakin itu aman tetapi dari atas kepala saya, jika universal mengetahui thread yang bekerja dengannya, itu bisa melacak setiap thread dan menghapus elemen setelah semua thread aman melewatinya.
JimmyJames
@ VF1 Melihat kode untuk ConcurrentLinkedQueue, metode penawaran memiliki loop seperti yang Anda klaim membuat jawaban lain tidak menunggu-bebas. Lihat komentar "Balapan CAS yang hilang ke utas lain; baca kembali berikutnya"
JimmyJames
"Seharusnya mungkin untuk menghapus status awal" - persis. Ini harus , tapi mudah untuk secara halus memperkenalkan kode yang kehilangan kebebasan menunggu. Skema pelacakan utas mungkin berfungsi. Akhirnya, saya tidak memiliki akses ke sumber CLQ, maukah Anda menautkan?
VF1