Bagaimana cara mengimpor data dari mongodb ke panda?

99

Saya memiliki sejumlah besar data dalam koleksi di mongodb yang perlu saya analisis. Bagaimana cara mengimpor data itu ke panda?

Saya baru mengenal panda dan numpy.

EDIT: Koleksi mongodb berisi nilai sensor yang ditandai dengan tanggal dan waktu. Nilai sensor adalah tipe data float.

Contoh data:

{
"_cls" : "SensorReport",
"_id" : ObjectId("515a963b78f6a035d9fa531b"),
"_types" : [
    "SensorReport"
],
"Readings" : [
    {
        "a" : 0.958069536790466,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:26:35.297Z"),
        "b" : 6.296118156595,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95574014778624,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:09.963Z"),
        "b" : 6.29651468650064,
        "_cls" : "Reading"
    },
    {
        "a" : 0.953648289182713,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:27:37.545Z"),
        "b" : 7.29679823731148,
        "_cls" : "Reading"
    },
    {
        "a" : 0.955931884300997,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:28:21.369Z"),
        "b" : 6.29642922525632,
        "_cls" : "Reading"
    },
    {
        "a" : 0.95821381,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:20.801Z"),
        "b" : 7.28956613,
        "_cls" : "Reading"
    },
    {
        "a" : 4.95821335,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:41:36.931Z"),
        "b" : 6.28956574,
        "_cls" : "Reading"
    },
    {
        "a" : 9.95821341,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:42:09.971Z"),
        "b" : 0.28956488,
        "_cls" : "Reading"
    },
    {
        "a" : 1.95667927,
        "_types" : [
            "Reading"
        ],
        "ReadingUpdatedDate" : ISODate("2013-04-02T08:43:55.463Z"),
        "b" : 0.29115237,
        "_cls" : "Reading"
    }
],
"latestReportTime" : ISODate("2013-04-02T08:43:55.463Z"),
"sensorName" : "56847890-0",
"reportCount" : 8
}
Nithin
sumber
Menggunakan jenis bidang khusus dengan MongoEngine dapat membuat penyimpanan dan pengambilan Pandas DataFrames sesederhanamongo_doc.data_frame = my_pandas_df
Jthorpe

Jawaban:

133

pymongo mungkin bisa membantu Anda, berikut adalah beberapa kode yang saya gunakan:

import pandas as pd
from pymongo import MongoClient


def _connect_mongo(host, port, username, password, db):
    """ A util for making a connection to mongo """

    if username and password:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
        conn = MongoClient(mongo_uri)
    else:
        conn = MongoClient(host, port)


    return conn[db]


def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
    """ Read from Mongo and Store into DataFrame """

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
waitingkuo
sumber
Terima kasih, ini adalah metode yang akhirnya saya gunakan. Saya juga memiliki berbagai dokumen yang disematkan di setiap baris. Jadi saya harus mengulanginya juga di setiap baris. Apakah ada cara yang lebih baik untuk melakukan ini ??
Nithin
Apakah mungkin untuk memberikan beberapa contoh struktur mongodb Anda?
waitingkuo
3
Perhatikan bagian list()dalam df = pd.DataFrame(list(cursor))mengevaluasi sebagai daftar atau generator, untuk menjaga CPU tetap dingin. Jika Anda memiliki milyaran satu item data, dan beberapa baris berikutnya akan cukup dipartisi, level-of-detail, dan dipotong, seluruh shmegegge masih aman untuk dimasukkan. Bagus.
Phlip
2
Ini sangat lambat @ df = pd.DataFrame(list(cursor)). Quering db murni jauh lebih cepat. Bisakah kita mengubah listtransmisi ke yang lain?
Peter.k
1
@Peter kalimat itu juga menarik perhatian saya. Mentransmisikan kursor database, yang dirancang agar dapat diulang dan berpotensi membungkus data dalam jumlah besar, ke dalam daftar dalam memori tampaknya tidak pintar bagi saya.
Rafa
41

Anda dapat memuat data mongodb Anda ke DataFrame pandas menggunakan kode ini. Ini bekerja untuk saya. Semoga untuk Anda juga.

import pymongo
import pandas as pd
from pymongo import MongoClient
client = MongoClient()
db = client.database_name
collection = db.collection_name
data = pd.DataFrame(list(collection.find()))
saimadhu.polamuri
sumber
24

Monarymelakukan persis seperti itu, dan ini sangat cepat . ( tautan lain )

Lihat posting keren ini yang menyertakan tutorial singkat dan beberapa pengaturan waktu.

shx2
sumber
Apakah Monary mendukung tipe data string?
Snehal Parmar
Saya mencoba Monary, tetapi itu memakan banyak waktu. Apakah saya melewatkan beberapa pengoptimalan? Mencoba client = Monary(host, 27017, database="db_tmp") columns = ["col1", "col2"] data_type = ["int64", "int64"] arrays = client.query("db_tmp", "coll", {}, columns, data_type)Untuk 50000catatan mengambil sekitar 200s.
nishant
Kedengarannya sangat lambat ... Terus terang, saya tidak tahu apa status proyek ini, sekarang, 4 tahun kemudian ...
shx2
16

Sesuai PEP, sederhana lebih baik daripada rumit:

import pandas as pd
df = pd.DataFrame.from_records(db.<database_name>.<collection_name>.find())

Anda dapat menyertakan kondisi seperti Anda bekerja dengan database mongoDB biasa atau bahkan menggunakan find_one () untuk hanya mendapatkan satu elemen dari database, dll.

dan voila!

Cy Bu
sumber
pd.DataFrame.from_records tampaknya sama lambatnya dengan DataFrame (list ()), tetapi hasilnya sangat tidak konsisten. %% waktu menunjukkan apa pun dari 800 md hingga 1,9 d
AFD
1
Ini tidak baik untuk catatan besar karena ini tidak menunjukkan kesalahan memori, instread membuat sistem hang karena data yang terlalu besar. sedangkan pd.DataFrame (daftar (kursor)) menunjukkan kesalahan memori.
Amulya Acharya
13
import pandas as pd
from odo import odo

data = odo('mongodb://localhost/db::collection', pd.DataFrame)
fengwt
sumber
9

Untuk menangani data out-of-core (tidak sesuai dengan RAM) secara efisien (yaitu dengan eksekusi paralel), Anda dapat mencoba ekosistem Python Blaze : Blaze / Dask / Odo.

Blaze (dan Odo ) memiliki fungsi out-of-the-box untuk menangani MongoDB.

Beberapa artikel bermanfaat untuk memulai:

Dan sebuah artikel yang menunjukkan hal-hal menakjubkan yang mungkin terjadi dengan tumpukan Blaze: Menganalisis 1,7 Miliar Komentar Reddit dengan Blaze dan Impala (pada dasarnya, menanyakan 975 Gb komentar Reddit dalam hitungan detik).

NB Saya tidak berafiliasi dengan salah satu teknologi ini.

Dennis Golomazov
sumber
1
Saya juga menulis posting menggunakan Jupyter Notebook dengan contoh bagaimana Dask membantu mempercepat eksekusi bahkan pada data yang pas ke memori dengan menggunakan banyak core pada satu mesin.
Dennis Golomazov
8

Opsi lain yang menurut saya sangat berguna adalah:

from pandas.io.json import json_normalize

cursor = my_collection.find()
df = json_normalize(cursor)

dengan cara ini Anda dapat membuka dokumen mongodb bersarang secara gratis.

Ikar Pohorský
sumber
2
Saya mendapat kesalahan dengan metode iniTypeError: data argument can't be an iterator
Gabriel Fair
2
Aneh, ini berfungsi pada python saya 3.6.7menggunakan panda 0.24.2. Mungkin Anda bisa mencobanya df = json_normalize(list(cursor))?
Ikar Pohorský
Untuk +1. docs, argumen max_level mendefinisikan level max kedalaman dict. Saya baru saja melakukan pengujian dan itu tidak benar, jadi beberapa kolom perlu dipisahkan dengan .str accesrors. Tetap saja, fitur yang sangat bagus untuk bekerja dengan mongodb.
Mauricio Maroto
5

Menggunakan

pandas.DataFrame(list(...))

akan memakan banyak memori jika hasil iterator / generator besar

lebih baik untuk menghasilkan potongan kecil dan concat di bagian akhir

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)
Deo Leung
sumber
1

Mengikuti jawaban hebat ini oleh waitingkuo, saya ingin menambahkan kemungkinan melakukan itu menggunakan chunksize sejalan dengan .read_sql () dan .read_csv () . Saya memperbesar jawaban dari Deu Leung dengan menghindari pergi satu per satu setiap 'catatan' dari 'iterator' / 'kursor'. Saya akan meminjam fungsi read_mongo sebelumnya .

def read_mongo(db, 
           collection, query={}, 
           host='localhost', port=27017, 
           username=None, password=None,
           chunksize = 100, no_id=True):
""" Read from Mongo and Store into DataFrame """


# Connect to MongoDB
#db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)
client = MongoClient(host=host, port=port)
# Make a query to the specific DB and Collection
db_aux = client[db]


# Some variables to create the chunks
skips_variable = range(0, db_aux[collection].find(query).count(), int(chunksize))
if len(skips_variable)<=1:
    skips_variable = [0,len(skips_variable)]

# Iteration to create the dataframe in chunks.
for i in range(1,len(skips_variable)):

    # Expand the cursor and construct the DataFrame
    #df_aux =pd.DataFrame(list(cursor_aux[skips_variable[i-1]:skips_variable[i]]))
    df_aux =pd.DataFrame(list(db_aux[collection].find(query)[skips_variable[i-1]:skips_variable[i]]))

    if no_id:
        del df_aux['_id']

    # Concatenate the chunks into a unique df
    if 'df' not in locals():
        df =  df_aux
    else:
        df = pd.concat([df, df_aux], ignore_index=True)

return df
Rafael Valero
sumber
1

Pendekatan serupa seperti Rafael Valero, waitingkuo, dan Deu Leung menggunakan pagination :

def read_mongo(
       # db, 
       collection, query=None, 
       # host='localhost', port=27017, username=None, password=None,
       chunksize = 100, page_num=1, no_id=True):

    # Connect to MongoDB
    db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

    # Calculate number of documents to skip
    skips = chunksize * (page_num - 1)

    # Sorry, this is in spanish
    # https://www.toptal.com/python/c%C3%B3digo-buggy-python-los-10-errores-m%C3%A1s-comunes-que-cometen-los-desarrolladores-python/es
    if not query:
        query = {}

    # Make a query to the specific DB and Collection
    cursor = db[collection].find(query).skip(skips).limit(chunksize)

    # Expand the cursor and construct the DataFrame
    df =  pd.DataFrame(list(cursor))

    # Delete the _id
    if no_id:
        del df['_id']

    return df
Jordy Cuan
sumber
0

Anda dapat mencapai apa yang Anda inginkan dengan pdmongo dalam tiga baris:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [], "mongodb://localhost:27017/mydb")

Jika data Anda sangat besar, Anda dapat melakukan kueri agregat terlebih dahulu dengan memfilter data yang tidak Anda inginkan, kemudian memetakannya ke kolom yang Anda inginkan.

Berikut adalah contoh pemetaan Readings.ake kolom adan pemfilteran berdasarkan reportCountkolom:

import pdmongo as pdm
import pandas as pd
df = pdm.read_mongo("MyCollection", [{'$match': {'reportCount': {'$gt': 6}}}, {'$unwind': '$Readings'}, {'$project': {'a': '$Readings.a'}}], "mongodb://localhost:27017/mydb")

read_mongomenerima argumen yang sama dengan agregat pymongo

pakallis
sumber