Bagaimana saya bisa memastikan bahwa suatu pekerjaan tidak berjalan dua kali di Bull?

11

Saya memiliki dua fungsi, scheduleScan()dan scan().

scan()panggilan scheduleScan() saat tidak ada lagi yang harus dilakukan selain menjadwalkan pemindaian baru , jadi scheduleScan()dapat menjadwalkan a scan(). Tapi ada masalah, beberapa pekerjaan berjalan dua kali.

Saya ingin memastikan bahwa hanya satu pekerjaan yang sedang diproses pada waktu tertentu. Bagaimana saya bisa mencapainya? Saya percaya ini ada hubungannya done(), (sudah dalam pemindaian (), dihapus sekarang) tapi saya tidak bisa menemukan solusi.

Versi banteng: 3.12.1

Edit terlambat yang penting: scan() memanggil fungsi lain dan mereka mungkin atau mungkin tidak memanggil fungsi lain, tetapi mereka semua fungsi sinkronisasi, jadi mereka hanya memanggil fungsi ketika pekerjaan mereka sendiri selesai, hanya ada satu jalan ke depan. Pada akhir "tree", saya menyebutnya, fungsi terakhir memanggil scheduleScan (), tetapi tidak mungkin ada dua pekerjaan simultan yang berjalan. Setiap pekerjaan dimulai scan(), dengan cara, dan mereka berakhir denganscheduleScan(stock, period, milliseconds, 'called by file.js')

export function update(job) {
  // does some calculations, then it may call scheduleScan() or
  // it may call another function, and that could be the one calling
  // scheduleScan() function.
  // For instance, a function like finalize()
}

export function scan(job) {
  update(job)
}


import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)

queue.process(1, (job) => {
  job.progress(100).then(() => {
    scan(job)
  })
})

export function scheduleScan (stock, period, milliseconds, triggeredBy) {
  let uniqueId = stringHash(stock + ':' + period)

  queue.getJob(uniqueId).then(job => {
    if (!job) {
      if (milliseconds) {
        queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
          // console.log('Added with ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      } else {
        queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
          // console.log('Added without ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      }
    } else {
      job.getState().then(state => {
        if (state === 'completed') {
          job.remove().then(() => {
            if (milliseconds) {
              queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
                // console.log('Added with ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            } else {
              queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
                // console.log('Added without ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            }
          }).catch(err => {
            if (err) {
              // console.log(err)
            }
          })
        }
      }).catch(err => {
        // console.log(err)
      })
    }
  })
}
salep
sumber
Saya tidak dapat menemukan scanfungsi, dapatkah Anda membantu?
Muhammad Zeeshan
@MuhammadZeeshan saya menambahkannya, kesalahan saya.
salep

Jawaban:

6

Masalahnya, saya percaya scanfungsi Anda async. Jadi job.progressfungsi Anda memanggil scandan kemudian segera memanggil donememungkinkan antrian untuk memproses pekerjaan lain.

Sebuah solusi bisa dengan meneruskan donepanggilan balik sebagai parameter untuk Anda scandan scheduleScanfungsinya, dan memohonnya, setelah Anda menyelesaikan pekerjaan Anda (atau karena kesalahan).

Solusi (yang lebih baik) lainnya adalah memastikan bahwa Anda selalu mengembalikan a Promisedari scandan scheduleScan, kemudian menunggu janji untuk menyelesaikan dan kemudian menelepon done. Jika melakukan ini, pastikan Anda mengaitkan semua janji Anda kembali dalam scheduleScanfungsi Anda .

queue.process(1, (job, done) => {
  job.progress(100).then(() => {
    scan(job)
        .then(done)
        .catch(done)
  })
})

export function scan() {
   // business logic
   return scheduleScan()
}

// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
    return queue.getJob(..).then(() => {
        ....
        return queue.add()...
        ....
        return queue.add(...)
            .catch(e => {
                 console.log(e);
                 // propogate errors!
                 throw e;
             })

}
jeeves
sumber
Saya telah mengedit pertanyaan saya, dapatkah Anda memeriksanya kembali, terutama bagian "Edit terlambat penting"? Apakah jawaban Anda masih berlaku dalam situasi ini? Terima kasih.
salep
1
Ya, itu masih berlaku. Dari hasil edit Anda, saya pikir yang Anda katakan scheduledScanselalu dipanggil setelah semua fungsi sinkronisasi lainnya masuk scan. Jika ini masalahnya, maka ya, jawaban saya masih valid. Hanya selalu kembali janji yang akan kembali dari scheduleScandalam scanfungsi
Jeeves
Sekali lagi, kesalahanku. Fungsi pertama, perbarui (), dalam pemindaian, tetapi pembaruan () dapat memanggil fungsi lain seperti finalize (), dan finalize () dapat memanggil scheduleScan (). Harap diingat bahwa ini terjadi secara berurutan, jadi tidak ada banyak panggilan, saya melakukan ini untuk menjaga agar aplikasi saya tetap modular. - Terima kasih
salep
1
Yap, jawaban yang sama. Jika ada updatepanggilan scheduledScanatau sejumlah fungsi di antara mereka. Poin kuncinya adalah Anda harus mengembalikan rantai janji dari scheduleScansemua jalan kembali ke scanfungsi. Jadi, jika scanpanggilan updateyang memanggil finalise..... Yang memanggil scheduleScanrantai janji harus dikembalikan melalui semua pemanggilan fungsi, yaitu Pastikan Anda mengembalikan janji dari masing-masing fungsi ini.
Astaga
Jadi hanya untuk memperjelas komentar terakhir saya. Misalnya, jika di dalam pemindaian Anda memanggil pembaruan. Anda harus mengembalikan hasil pembaruan (janji) dari fungsi pemindaian.
ampun
4

Fungsi pemindaian adalah fungsi asinkron. Dalam queue.process()fungsi Anda, Anda harus menunggu fungsi pemindaian dan kemudian memanggil panggilan done()balik.

export async function scan(job) {
  // it does some calculations, then it creates a new schedule.
  return scheduleScan(stock, period, milliseconds, "scan.js");
}

queue.process(1, (job, done) => {
  job.progress(100).then(async() => {
    await scan(job);
    done();
  });
});

export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
    let uniqueId = stringHash(stock + ":" + period);
    try {
      const existingJob = await queue.getJob(uniqueId);
      if (!existingJob) {
        const job = await addJob({
          queue,
          stock,
          period,
          uniqueId,
          milliseconds,
          triggeredBy
        });
        return job;
      } else {
        const jobState = await existingJob.getState();
        if (jobState === "completed") {
          await existingJob.remove();
          const newJob = await addJob({
            queue,
            stock,
            period,
            uniqueId,
            milliseconds,
            triggeredBy
          });
          return newJob;
        }
      }
    } catch (err) {
      throw new Error(err);
    }
}

export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
  if (milliseconds) {
    return queue.add(
      { stock, period, triggeredBy },
      { delay: milliseconds, jobId: uniqueId }
    );
  } else {
    return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
  }
}

Coba ini! Saya sudah mencoba untuk sedikit memperbaiki kode dengan menggunakan async-wait.

Adithya Sreyaj
sumber
Saya telah mengedit pertanyaan saya, dapatkah Anda memeriksanya kembali, terutama bagian "Edit terlambat penting"? Apakah jawaban Anda masih berlaku dalam situasi ini? Terima kasih.
salep