How to Import Data from Mongodb to Pandas

How to import data from mongodb to pandas?

pymongo might give you a hand, followings is some code I'm using:

import pandas as pd
from pymongo import MongoClient

def _connect_mongo(host, port, username, password, db):
""" A util for making a connection to mongo """

if username and password:
mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db)
conn = MongoClient(mongo_uri)
else:
conn = MongoClient(host, port)

return conn[db]

def read_mongo(db, collection, query={}, host='localhost', port=27017, username=None, password=None, no_id=True):
""" Read from Mongo and Store into DataFrame """

# Connect to MongoDB
db = _connect_mongo(host=host, port=port, username=username, password=password, db=db)

# Make a query to the specific DB and Collection
cursor = db[collection].find(query)

# Expand the cursor and construct the DataFrame
df = pd.DataFrame(list(cursor))

# Delete the _id
if no_id:
del df['_id']

return df

How can I load data from mongodb collection into pandas' DataFrame?

Comprehend the cursor you got from the MongoDB before passing it to DataFrame

import pandas as pd
df = pd.DataFrame(list(tweets.find()))

Import specific data from mongo to pandas dataframe

Try this

df = json_normalize(list(
collection.aggregate([
{
"$match": query
},
{
"$replaceRoot": {
"newRoot": "$statement"
}
}
])
)

Putting data from MongoDB into a chart in python (using pandas dataframe)

You can try changing the datatype of the dataframe columns like:

df_mongo['Production'] = df_mongo['Production'].astype(int)

df_mongo['DateTime'] = pd.to_datetime(df_mongo['DateTime'])

MongoDB collection to pandas Dataframe

Update

I broke out the ol Python to give this a crack - the following code works flawlessly!

from pymongo import MongoClient
import pandas as pd

uri = "mongodb://<your_mongo_uri>:27017"
database_name = "<your_database_name"
collection_name = "<your_collection_name>"

mongo_client = MongoClient(uri)
database = mongo_client[database_name]
collection = database[collection_name]

# I used this code to insert a doc into a test collection
# before querying (just incase you wanted to know lol)
"""
data = {
"_id": 1,
"name": "Growth Lead Momentum",
"factors": [
{
"factorId": "C24",
"index": 0,
"weight": 1
},
{
"factorId": "D74",
"index": 7,
"weight": 9
}
]
}

insert_result = collection.insert_one(data)
print(insert_result)
"""

# This is the query that
# answers your question

results = collection.aggregate([
{
"$unwind": "$factors"
},
{
"$project": {
"_id": 1, # Change to 0 if you wish to ignore "_id" field.
"name": 1,
"factorId": "$factors.factorId",
"index": "$factors.index",
"weight": "$factors.weight"
}
}
])

# This is how we turn the results into a DataFrame.
# We can simply pass `list(results)` into `DataFrame(..)`,
# due to how our query works.

results_as_dataframe = pd.DataFrame(list(results))
print(results_as_dataframe)

Which outputs:

   _id                  name factorId  index  weight
0 1 Growth Lead Momentum C24 0 1
1 1 Growth Lead Momentum D74 7 9

Original Answer

You could use the aggregation pipeline to unwind factors and then project the fields you want.

Something like this should do the trick.

Live demo here.

Database Structure

[
{
"_id": 1,
"name": "Growth Lead Momentum",
"factors": [
{
factorId: "C24",
index: 0,
weight: 1
},
{
factorId: "D74",
index: 7,
weight: 9
}
]
}
]

Query

db.collection.aggregate([
{
$unwind: "$factors"
},
{
$project: {
_id: 1,
name: 1,
factorId: "$factors.factorId",
index: "$factors.index",
weight: "$factors.weight"
}
}
])

Results

(.csv friendly)

[
{
"_id": 1,
"factorId": "C24",
"index": 0,
"name": "Growth Lead Momentum",
"weight": 1
},
{
"_id": 1,
"factorId": "D74",
"index": 7,
"name": "Growth Lead Momentum",
"weight": 9
}
]

A better way to load MongoDB data to a DataFrame using Pandas and PyMongo?

I've modified my code to the following:

cursor = tweets.find(fields=['id'])
tweet_fields = ['id']
result = DataFrame(list(cursor), columns = tweet_fields)

By adding the fields parameter in the find() function I restricted the output. Which means that I'm not loading every field but only the selected fields into the DataFrame. Everything works fine now.

OOM when reading data to Pandas from MongoDB using pymongo client

I have found a solution with multiprocessing and its is the fastest

def chunks(collection_size, n_cores=mp.cpu_count()):
""" Return chunks of tuples """

batch_size = round(collection_size/n_cores)
rest = collection_size%batch_size
cumulative = 0
for i in range(n_cores):
cumulative += batch_size
if i == n_cores-1:
yield (batch_size*i,cumulative+rest)
else:
yield (batch_size*i,cumulative)

def parallel_read(skipses,host=HOST, port=PORT):

print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
client = MongoClient(host,port)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

cursor = collection.find({},{ '_id': False } )
_df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
return _df

def read_mongo(colc_size,_workers=mp.cpu_count()):
temp_df = pd.DataFrame()
pool = mp.Pool(processes=_workers)
results = [pool.apply_async(parallel_read, args=(chunk,)) for chunk in chunks(colc_size,n_cores=_workers)]
output = [p.get() for p in results]
temp_df = pd.concat(output)
return temp_df

time_0 = time()
df = read_mongo(get_collection_size())
print("Reading database with {} processes took {}".format(mp.cpu_count(),time()-time_0))

Starting process on range of 0 to 53866
Starting process on range of 323196 to 377062
Starting process on range of 430928 to 484794
Starting process on range of 538660 to 592526
Starting process on range of 377062 to 430928
Starting process on range of 700258 to 754124
Starting process on range of 53866 to 107732
Starting process on range of 484794 to 538660
Starting process on range of 592526 to 646392
Starting process on range of 646392 to 700258
Starting process on range of 215464 to 269330
Starting process on range of 754124 to 807990
Starting process on range of 807990 to 915714
Starting process on range of 107732 to 161598
Starting process on range of 161598 to 215464
Starting process on range of 269330 to 323196

Reading database with 16 processes took 142.64860558509827

With one of the examples above (no multiprocessing)

def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame

This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)

time_0 = time()
cursor = collection.find()
chunk_size = 1000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Reading database with chunksize = 10000 took 372.1170778274536

time_0 = time()
cursor = collection.find()
chunk_size = 10000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Reading database with chunksize = 10000 took 367.02637577056885

Import nested MongoDB to Pandas

You don't need to convert the nested structures using json parsers. Just create your dataframe from the record list:

df = DataFrame(list(cursor))

and afterwards use pandas in order to unpack your lists and dictionaries:

import pandas
from itertools import chain
import numpy

df = pandas.DataFrame(t)
df['stage.value'] = df['stage'].apply(lambda cell: cell['value'])
df['stage.name'] = df['stage'].apply(lambda cell: cell['Name'])
df['q_']= df['quality'].apply(lambda cell: [(m['type']['Name'], m['value'] if 'value' in m.keys() else 1) for m in cell])
df['q_'] = df['q_'].apply(lambda cell: dict((k, v) for k, v in cell))
keys = set(chain(*df['q_'].apply(lambda column: column.keys())))
for key in keys:
column_name = 'q_{}'.format(key).lower()
df[column_name] = df['q_'].apply(lambda cell: cell[key] if key in cell.keys() else numpy.NaN)
df.drop(['stage', 'quality', 'q_'], axis=1, inplace=True)

I use three steps in order to unpack the nested data types. Firstly, the names and values are used to create a flat list of pairs (tuples). In the second step a dictionary based on the tuples takes keys from 1st and values from 2nd location of the tuples. Then all existing property names are extracted once using a set. Each property gets a new column using a loop. Inside the loop the values of each pair is mapped to the respective column cells.



Related Topics



Leave a reply



Submit