Cara terbaik untuk memindahkan pesan dari DLQ di Amazon SQS?

88

Apa praktik terbaik untuk memindahkan pesan dari antrian surat mati kembali ke antrian asli di Amazon SQS?

Apakah itu

  1. Dapatkan pesan dari DLQ
  2. Tulis pesan ke antrian
  3. Hapus pesan dari DLQ

Atau ada cara yang lebih sederhana?

Selain itu, apakah AWS pada akhirnya akan memiliki alat di konsol untuk memindahkan pesan dari DLQ?

Matt Dell
sumber
github.com/garryyao/replay-aws-dlq bekerja dengan cukup baik
Ulad Kasach
juga alternatif lain github.com/mercury2269/sqsmover
Sergey

Jawaban:

135

Ini hack cepat. Ini jelas bukan pilihan terbaik atau direkomendasikan.

  1. Tetapkan antrean SQS utama sebagai DLQ untuk DLQ aktual dengan Penerimaan Maksimum sebagai 1.
  2. Lihat konten dalam DLQ (Ini akan memindahkan pesan ke antrian utama karena ini adalah DLQ untuk DLQ yang sebenarnya)
  3. Hapus pengaturan sehingga antrian utama tidak lagi menjadi DLQ dari DLQ yang sebenarnya
Rajkumar
sumber
12
Ya, ini adalah peretasan yang sangat banyak - tetapi opsi yang bagus untuk perbaikan cepat jika Anda tahu apa yang Anda lakukan dan tidak punya waktu untuk menyelesaikannya dengan cara yang tepat #yolo
Thomas Watson
14
Tetapi jumlah penerimaan tidak diatur ulang ke 0 saat Anda melakukan ini. Hati-hati.
Rajdeep Siddhapura
1
Pendekatan yang tepat adalah mengonfigurasi Kebijakan Redrive di SQS dengan jumlah penerimaan maksimal dan secara otomatis akan memindahkan pesan ke DLQ ketika akan melewati jumlah penerimaan yang ditetapkan, lalu menulis utas pembaca untuk dibaca dari DLQ.
Abu
5
Kamu jenius.
JefClaes
1
Saya membuat alat CLI untuk masalah ini beberapa bulan yang lalu: github.com/renanvieira/phoenix-letter
MaltMaster
15

Ada beberapa skrip di luar sana yang melakukan ini untuk Anda:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 
Ulad Kasach
sumber
1
Ini adalah cara paling sederhana, tidak seperti jawaban yang diterima. Jalankan saja ini dari terminal yang memiliki set properti AWS env vars:npx replay-aws-dlq DL_URI MAIN_URI
Vasyl Boroviak
Perhatikan salah ketik: dql -> dlq # install npm install replay-aws-dlq;
Lee Oades
Ini bekerja dengan sempurna untuk saya (perhatikan, saya hanya mencoba yang berbasis go). Tampak memindahkan pesan secara bertahap dan tidak sekaligus (hal yang baik) bahkan memiliki progress bar. Lebih baik dari jawaban yang diterima IMO.
Yevgeny Ananin
Ada entri blog AWS baru-baru ini yang menggunakan Lambda untuk menyelesaikan tugas yang diberikan. Itu juga diterbitkan di repositori aplikasi tanpa server AWS: aws.amazon.com/blogs/compute/… (Saya belum mencobanya, karena saya akan melakukan peretasan cepat di atas, tetapi ini sepertinya cara yang harus dilakukan)
th-
13

Tidak perlu memindahkan pesan karena akan datang dengan begitu banyak tantangan lain seperti pesan duplikat, skenario pemulihan, pesan hilang, pemeriksaan de-duplikasi, dll.

Inilah solusi yang kami terapkan -

Biasanya, kami menggunakan DLQ untuk kesalahan sementara, bukan untuk kesalahan permanen. Jadi ambil pendekatan di bawah ini -

  1. Baca pesan dari DLQ seperti antrian biasa

    Manfaat
    • Untuk menghindari pemrosesan pesan duplikat
    • Kontrol yang lebih baik pada DLQ- Seperti saya memberi tanda centang, untuk memproses hanya ketika antrian biasa benar-benar diproses.
    • Tingkatkan proses berdasarkan pesan di DLQ
  2. Kemudian ikuti kode yang sama yang mengikuti antrian biasa.

  3. Lebih andal jika pekerjaan dibatalkan atau proses dihentikan saat memproses (mis. Instance dimatikan atau proses dihentikan)

    Manfaat
    • Kode dapat digunakan kembali
    • Penanganan kesalahan
    • Pemulihan dan pemutaran ulang pesan
  4. Perluas visibilitas pesan sehingga tidak ada thread lain yang memprosesnya.

    Manfaat
    • Hindari memproses rekaman yang sama dengan banyak utas.
  5. Hapus pesan hanya jika ada kesalahan permanen atau berhasil.

    Manfaat
    • Lanjutkan proses sampai kami mendapatkan error sementara.
Abu
sumber
Saya sangat suka pendekatan Anda! Bagaimana Anda mendefinisikan "kesalahan permanen" dalam kasus ini?
DMac the Destroyer
Apa pun yang lebih besar dari kode status HTTP> 200 <500 adalah kesalahan permanen
Ash
ini memang pendekatan yang bagus dalam produksi. namun saya pikir posting ini menanyakan cara mengirim ulang pesan dari DLQ ke antrian normal. yang terkadang berguna jika Anda tahu apa yang Anda lakukan.
linehrr
Itulah yang saya katakan bahwa Anda tidak boleh melakukannya. Karena jika Anda melakukannya maka akan menimbulkan lebih banyak masalah. Kita dapat memindahkan pesan seperti pesan push lainnya tetapi akan kehilangan fungsi DLQ seperti jumlah penerimaan, visibilitas, dan semuanya. Ini akan diperlakukan sebagai pesan baru.
Abu
6

Sepertinya itu pilihan terbaik Anda. Ada kemungkinan bahwa proses Anda gagal setelah langkah 2. Dalam hal ini Anda akan menyalin pesan dua kali, tetapi aplikasi Anda harus menangani pengiriman ulang pesan (atau tidak peduli).

Dave
sumber
6

sini:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()
Brian Dilley
sumber
Apakah ini Python?
carlin.scott
python2 sebenarnya
Kristof Jozsa
4

Ada cara lain untuk mencapai ini tanpa menulis satu baris kode pun. Pertimbangkan nama antrian Anda yang sebenarnya adalah SQS_Queue dan DLQ untuk itu adalah SQS_DLQ. Sekarang ikuti langkah-langkah ini:

  1. Setel SQS_Queue sebagai dlq dari SQS_DLQ. Karena SQS_DLQ sudah menjadi dlq dari SQS_Queue. Sekarang, keduanya bertindak sebagai dlq yang lain.
  2. Setel jumlah penerimaan maksimal SQS_DLQ Anda ke 1.
  3. Sekarang baca pesan dari konsol SQS_DLQ. Karena jumlah penerimaan pesan adalah 1, itu akan mengirim semua pesan ke dlq-nya sendiri yang merupakan antrian SQS_Queue Anda yang sebenarnya.
Priyanka Agarwal
sumber
Itu akan menggagalkan tujuan mempertahankan DLQ. DLQ dimaksudkan untuk tidak membebani sistem Anda secara berlebihan ketika Anda mengamati kegagalan sehingga Anda dapat melakukannya nanti.
Buddha
1
Ini pasti akan menggagalkan tujuan dan Anda tidak akan dapat memperoleh manfaat lain seperti peningkatan, pembatasan, dan penerimaan hitungan. Selain itu, Anda harus menggunakan antrian biasa sebagai antrian pemrosesan dan jika jumlah pesan yang diterima mencapai 'N' maka itu harus pergi ke DLQ. Inilah yang idealnya, itu harus dikonfigurasi.
Ash
3
Sebagai solusi 1 kali untuk mendorong ulang banyak pesan, ini berfungsi seperti pesona. Bukan solusi jangka panjang yang baik.
nmio
Ya, ini sangat berharga sebagai solusi satu kali untuk redrive pesan (setelah memperbaiki masalah di antrian utama). Pada AWS CLI perintah yang saya digunakan adalah: aws sqs receive-message --queue-url <url of DLQ> --max-number-of-messages 10. Karena pesan maksimal Anda dapat membaca huruf besar pada 10, saya sarankan untuk menjalankan perintah dalam lingkaran seperti ini:for i in {1..1000}; do <CMD>; done
Patrick Finnigan
3

Saya menulis skrip python kecil untuk melakukan ini, dengan menggunakan boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

Anda bisa mendapatkan skrip ini di sini tautan

skrip ini pada dasarnya dapat memindahkan pesan antar antrian sembarang. dan mendukung antrian fifo serta Anda dapat menyediakan message_group_idlapangan.

linehrr
sumber
3

Kami menggunakan skrip berikut untuk memindahkan pesan dari antrian src ke antrian tgt:

nama file: redrive.py

pemakaian: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()
menrfa
sumber
0

DLQ mulai berlaku hanya jika konsumen asli gagal menggunakan pesan dengan sukses setelah berbagai upaya. Kami tidak ingin menghapus pesan karena kami yakin kami masih dapat melakukan sesuatu dengannya (mungkin mencoba memproses lagi atau mencatatnya atau mengumpulkan beberapa statistik) dan kami tidak ingin terus menemukan pesan ini berulang kali dan menghentikan kemampuan untuk memproses pesan lain di balik pesan ini.

DLQ tidak lain hanyalah antrian lain. Yang berarti kita perlu menulis konsumen untuk DLQ yang idealnya berjalan lebih jarang (dibandingkan dengan antrian asli) yang akan mengkonsumsi dari DLQ dan menghasilkan pesan kembali ke antrian asli dan menghapusnya dari DLQ - jika itu perilaku yang diinginkan dan menurut kami konsumen asli sekarang siap untuk memprosesnya lagi. Seharusnya tidak masalah jika siklus ini berlanjut untuk sementara waktu karena sekarang kami juga mendapatkan kesempatan untuk memeriksa secara manual dan membuat perubahan yang diperlukan dan menerapkan versi lain dari konsumen asli tanpa kehilangan pesan (tentu saja dalam periode retensi pesan - yaitu 4 hari sebelum default).

Alangkah baiknya jika AWS menyediakan kemampuan ini di luar kotak tetapi saya belum melihatnya - mereka menyerahkan ini kepada pengguna akhir untuk menggunakannya dengan cara yang mereka rasa sesuai.

rd2
sumber