Hitung kecepatan rata-rata jalan [ditutup]

20

Saya pergi ke wawancara pekerjaan insinyur data. Pewawancara bertanya kepada saya. Dia memberi saya beberapa situasi dan meminta saya untuk mendesain aliran data untuk sistem itu. Saya memecahkannya tetapi dia tidak menyukai solusi saya dan saya gagal. Saya ingin tahu apakah Anda memiliki ide yang lebih baik bagaimana mengatasi tantangan itu.

Pertanyaannya adalah:

Sistem kami menerima empat aliran data. Data berisi id kendaraan, kecepatan, dan koordinasi geolokasi. Setiap kendaraan mengirimkan datanya satu menit sekali. Tidak ada hubungan antara aliran tertentu ke jalan atau kendaraan khusus atau apa pun. Ada fungsi yang menerima koordinasi dan mengembalikan nama bagian jalan. Kita perlu mengetahui kecepatan rata-rata per ruas jalan per 5 menit. Akhirnya kami ingin menulis hasilnya ke Kafka.

masukkan deskripsi gambar di sini

Jadi solusi saya adalah:

Pertama-tama, tulis semua data ke dalam cluster Kafka, ke dalam satu topik, dipartisi oleh 5-6 digit pertama garis bujur digabung menjadi 5-6 digit pertama garis bujur. Kemudian membaca data dengan Structured Streaming, menambahkan untuk setiap baris nama bagian jalan oleh koordinat (ada udf yang sudah ditentukan sebelumnya untuk itu), dan kemudian mengumpulkan data dengan nama bagian jalan.

Karena saya mempartisi data dalam Kafka dengan 5-6 digit pertama dari koordinasi, setelah menerjemahkan koordinat ke nama bagian, tidak perlu ada transfer banyak data ke partisi yang benar dan oleh karena itu saya dapat mengambil keuntungan dari operasi colesce () itu tidak memicu pengocokan penuh.

Kemudian menghitung kecepatan rata-rata per eksekutor.

Seluruh proses akan terjadi setiap 5 menit dan kami akan menulis data dalam mode Tambah ke wastafel Kafka akhir.

masukkan deskripsi gambar di sini

Jadi sekali lagi, pewawancara tidak menyukai solusi saya. Adakah yang bisa menyarankan cara memperbaikinya atau ide yang sama sekali berbeda dan lebih baik?

Alon
sumber
Bukankah lebih baik bertanya kepada orang itu apa sebenarnya yang tidak dia sukai?
Gino Pane
Saya pikir itu adalah ide yang buruk untuk dipartisi oleh lat-panjang bersambung. Tidakkah titik data dilaporkan untuk setiap jalur sebagai koordinat yang sedikit berbeda?
webber
@webber karena itu saya hanya mengambil beberapa digit, jadi posisinya tidak akan unik tetapi relatif dalam ukuran bagian jalan.
Alon

Jawaban:

6

Saya menemukan pertanyaan ini sangat menarik dan berpikir untuk mencobanya.

Saat saya mengevaluasi lebih lanjut, upaya Anda sendiri baik, kecuali yang berikut:

dipartisi oleh 5-6 digit pertama garis bujur yang disatukan dengan 5-6 digit pertama garis bujur

Jika Anda sudah memiliki metode untuk mendapatkan id / nama bagian jalan berdasarkan lintang dan bujur, mengapa tidak memanggil metode itu terlebih dahulu dan menggunakan id / nama bagian jalan untuk mempartisi data?

Dan setelah itu, semuanya cukup mudah, jadi topologi akan seperti itu

Merge all four streams ->
Select key as the road section id/name ->
Group the stream by Key -> 
Use time windowed aggregation for the given time ->
Materialize it to a store. 

(Penjelasan lebih rinci dapat ditemukan dalam komentar dalam kode di bawah ini. Silakan tanyakan jika ada yang tidak jelas)

Saya telah menambahkan kode di akhir jawaban ini, harap dicatat bahwa alih-alih rata-rata, saya telah menggunakan jumlah karena lebih mudah untuk ditunjukkan. Dimungkinkan untuk melakukan rata-rata dengan menyimpan beberapa data tambahan.

Saya sudah merinci jawabannya dalam komentar. Berikut ini adalah diagram topologi yang dihasilkan dari kode (terima kasih kepada https://zz85.github.io/kafka-streams-viz/ )

Topologi:

Diagram Topologi

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.TimeWindows;
    import org.apache.kafka.streams.state.Stores;
    import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class VehicleStream {
        // 5 minutes aggregation window
        private static final long AGGREGATION_WINDOW = 5 * 50 * 1000L;

        public static void main(String[] args) throws Exception {
            Properties properties = new Properties();

            // Setting configs, change accordingly
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "vehicle.stream.app");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,kafka2:19092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            // initializing  a streambuilder for building topology.
            final StreamsBuilder builder = new StreamsBuilder();

            // Our initial 4 streams.
            List<String> streamInputTopics = Arrays.asList(
                    "vehicle.stream1", "vehicle.stream2",
                    "vehicle.stream3", "vehicle.stream4"
            );
            /*
             * Since there is no connection between a specific stream
             * to a specific road or vehicle or anything else,
             * we can take all four streams as a single stream
             */
            KStream<String, String> source = builder.stream(streamInputTopics);

            /*
             * The initial key is unimportant (which can be ignored),
             * Instead, we will be using the section name/id as key.
             * Data will contain comma separated values in following format.
             * VehicleId,Speed,Latitude,Longitude
             */
            WindowBytesStoreSupplier windowSpeedStore = Stores.persistentWindowStore(
                    "windowSpeedStore",
                    AGGREGATION_WINDOW,
                    2, 10, true
            );
            source
                    .peek((k, v) -> printValues("Initial", k, v))
                    // First, we rekey the stream based on the road section.
                    .selectKey(VehicleStream::selectKeyAsRoadSection)
                    .peek((k, v) -> printValues("After rekey", k, v))
                    .groupByKey()
                    .windowedBy(TimeWindows.of(AGGREGATION_WINDOW))
                    .aggregate(
                            () -> "0.0", // Initialize
                            /*
                             * I'm using summing here for the aggregation as that's easier.
                             * It can be converted to average by storing extra details on number of records, etc..
                             */
                            (k, v, previousSpeed) ->  // Aggregator (summing speed)
                                    String.valueOf(
                                            Double.parseDouble(previousSpeed) +
                                                    VehicleSpeed.getVehicleSpeed(v).speed
                                    ),
                            Materialized.as(windowSpeedStore)
                    );
            // generating the topology
            final Topology topology = builder.build();
            System.out.print(topology.describe());

            // constructing a streams client with the properties and topology
            final KafkaStreams streams = new KafkaStreams(topology, properties);
            final CountDownLatch latch = new CountDownLatch(1);

            // attaching shutdown handler
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);
        }


        private static void printValues(String message, String key, Object value) {
            System.out.printf("===%s=== key: %s value: %s%n", message, key, value.toString());
        }

        private static String selectKeyAsRoadSection(String key, String speedValue) {
            // Would make more sense when it's the section id, rather than a name.
            return coordinateToRoadSection(
                    VehicleSpeed.getVehicleSpeed(speedValue).latitude,
                    VehicleSpeed.getVehicleSpeed(speedValue).longitude
            );
        }

        private static String coordinateToRoadSection(String latitude, String longitude) {
            // Dummy function
            return "Area 51";
        }

        public static class VehicleSpeed {
            public String vehicleId;
            public double speed;
            public String latitude;
            public String longitude;

            public static VehicleSpeed getVehicleSpeed(String data) {
                return new VehicleSpeed(data);
            }

            public VehicleSpeed(String data) {
                String[] dataArray = data.split(",");
                this.vehicleId = dataArray[0];
                this.speed = Double.parseDouble(dataArray[1]);
                this.latitude = dataArray[2];
                this.longitude = dataArray[3];
            }

            @Override
            public String toString() {
                return String.format("veh: %s, speed: %f, latlong : %s,%s", vehicleId, speed, latitude, longitude);
            }
        }
    }
Irshad PI
sumber
Bukankah menggabungkan semua aliran adalah ide yang buruk? Ini bisa menjadi penghambat aliran data Anda. Apa yang terjadi ketika Anda mulai menerima semakin banyak input stream ketika sistem Anda tumbuh? Akankah ini scalable?
wypul
@wypul> bukankah menggabungkan semua aliran adalah ide yang buruk? -> Saya pikir tidak. Paralelisme di Kafka tidak dicapai melalui stream, tetapi melalui partisi (dan tugas), threading, dll. Streaming adalah cara untuk mengelompokkan data. > Apakah ini dapat diskalakan? -> ya. Karena kami mengetikkan berdasarkan bagian jalan dan dengan asumsi bagian jalan terdistribusi secara adil, kami dapat meningkatkan jumlah partisi untuk topik-topik ini untuk secara paralel memproses aliran dalam wadah yang berbeda. Kita dapat menggunakan algoritma pemartisian yang baik berdasarkan bagian jalan untuk mendistribusikan beban di seluruh replika.
Irshad PI
1

Masalah seperti itu tampaknya sederhana dan solusi yang ditawarkan sudah masuk akal. Saya bertanya-tanya apakah pewawancara prihatin dengan desain dan kinerja solusi yang Anda fokuskan atau keakuratan hasilnya. Karena yang lain berfokus pada kode, desain, dan kinerja, saya akan mempertimbangkan akurasinya.

Solusi Streaming

Karena data mengalir, kami dapat memberikan perkiraan kasar tentang kecepatan rata-rata jalan. Estimasi ini akan membantu dalam mendeteksi kemacetan tetapi tidak aktif dalam menentukan batas kecepatan.

  1. Gabungkan semua 4 aliran data secara bersamaan.
  2. Buat jendela 5 menit untuk mengambil data dari semua 4 aliran dalam 5 menit.
  3. Terapkan UDF pada koordinat untuk mendapatkan nama jalan dan nama kota. Nama jalan sering digandakan di seluruh kota, jadi kami akan menggunakan nama kota + nama jalan sebagai kunci.
  4. Hitung kecepatan rata-rata dengan sintaks seperti -

    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

5. write the result to the Kafka Topic

Solusi Batch

Estimasi ini akan dimatikan karena ukuran sampel kecil. Kami akan membutuhkan pemrosesan batch pada data bulan / kuartal / tahun penuh untuk lebih akurat menentukan batas kecepatan.

  1. Baca data tahun dari danau data (atau Topik Kafka)

  2. Terapkan UDF pada koordinat untuk mendapatkan nama jalan dan nama kota.

  3. Hitung kecepatan rata-rata dengan sintaks seperti -


    vehicle_street_speed
      .groupBy($"city_name_street_name")
      .agg(
        avg($"speed").as("avg_speed")
      )

  1. tulis hasilnya ke data lake.

Berdasarkan batas kecepatan yang lebih akurat ini, kami dapat memprediksi lalu lintas lambat dalam aplikasi streaming.

Salim
sumber
1

Saya melihat beberapa masalah dengan strategi partisi Anda:

  • Ketika Anda mengatakan Anda akan mempartisi data Anda berdasarkan 5-6 digit pertama dari panjang lat, Anda tidak akan dapat menentukan jumlah partisi kafka sebelumnya. Anda akan memiliki data yang miring karena untuk beberapa ruas jalan Anda akan mengamati volume yang tinggi daripada yang lain.

  • Dan kombinasi kunci Anda tidak menjamin data bagian jalan yang sama di partisi yang sama dan karenanya Anda tidak dapat yakin bahwa tidak akan ada pengocokan.

Informasi yang diberikan IMO tidak cukup untuk merancang seluruh pipa data. Karena ketika mendesain pipa, bagaimana Anda mempartisi data Anda memainkan peran penting. Anda harus menanyakan lebih lanjut tentang data yang Anda terima seperti jumlah kendaraan, ukuran aliran data input, apakah jumlah aliran tetap atau dapatkah peningkatan di masa mendatang? Apakah aliran data input yang Anda terima adalah aliran kafka? Berapa banyak data yang Anda terima dalam 5 menit?

  • Sekarang mari kita asumsikan bahwa Anda memiliki 4 stream yang ditulis untuk 4 topik dalam kafka atau 4 partisi dan Anda tidak memiliki kunci spesifik tetapi data Anda dipartisi berdasarkan beberapa kunci pusat data atau hash dipartisi. Jika tidak maka ini harus dilakukan pada sisi data daripada menduplikasi data dalam aliran dan partisi kafka lain.
  • Jika Anda menerima data di pusat data yang berbeda maka Anda perlu membawa data ke satu cluster dan untuk tujuan itu Anda dapat menggunakan pembuat cermin Kafka atau yang serupa.
  • Setelah Anda memiliki semua data pada satu cluster, Anda dapat menjalankan pekerjaan streaming terstruktur di sana dan dengan interval pemicu dan tanda air 5 menit berdasarkan kebutuhan Anda.
  • Untuk menghitung rata-rata dan menghindari banyak pengocokan, Anda dapat menggunakan kombinasi mapValuesdan reduceByKeybukannya groupBy. Lihat ini .
  • Anda bisa menulis data ke wastafel kafka setelah diproses.
wypul
sumber
Nilai map dan penguranganByKey termasuk dalam RDD tingkat rendah. Bukankah Catalyst cukup pintar untuk menghasilkan RDD paling efisien ketika saya mengelompokkan dan menghitung rata-rata?
Alon
@Alon Catalyst pasti akan dapat mengetahui rencana terbaik untuk menjalankan kueri Anda tetapi jika Anda menggunakan groupBy, data dengan kunci yang sama akan dikocok ke partisi yang sama terlebih dahulu dan kemudian menerapkan operasi agregat untuk itu. mapValuesdan reduceBymemang milik RDD tingkat rendah tetapi masih akan berkinerja lebih baik dalam situasi ini karena pertama-tama akan menghitung agregat per partisi dan kemudian melakukan pengocokan.
wypul
0

Masalah utama yang saya lihat dengan solusi ini adalah:

  • Bagian jalan yang berada di tepi kotak 6 digit peta akan memiliki data dalam beberapa partisi topik dan akan memiliki beberapa kecepatan rata-rata.
  • Ukuran data konsumsi untuk partisi Kafka Anda mungkin tidak seimbang (kota vs gurun). Pemisahan dengan id pertama digit mobil mungkin merupakan ide bagus IMO.
  • Tidak yakin saya mengikuti bagian menyatu, tapi kedengarannya bermasalah.

Saya akan mengatakan solusi yang perlu dilakukan: baca dari aliran Kafka -> UDF -> bagian jalan kelompok -> rata-rata -> tulis ke aliran Kafka.

David Taub
sumber
0

Desain saya akan tergantung pada

  1. Jumlah jalan
  2. Jumlah kendaraan
  3. Biaya perhitungan jalan dari koordinat

Jika saya ingin mengukur jumlah hitungan, desainnya akan terlihat seperti ini masukkan deskripsi gambar di sini

Kepedulian lintas pada desain ini -

  1. Pertahankan keadaan aliran input yang tahan lama (jika input adalah kafka, kami dapat menyimpan offset dengan Kafka atau secara eksternal)
  2. Status pos pemeriksaan berkala ke sistem eksternal (Saya lebih suka menggunakan penghalang pos pemeriksaan async di Flink )

Beberapa peningkatan praktis mungkin pada desain ini -

  1. Caching fungsi pemetaan ruas jalan jika memungkinkan, berdasarkan jalan
  2. Menangani ping yang terlewat (dalam praktiknya tidak setiap ping tersedia)
  3. Memperhitungkan kelengkungan jalan (bantalan dan ketinggian diperhitungkan)
yugandhar
sumber