python操作MongoDB

python操作MongoDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from pymongo import MongoClient
from dataclasses import dataclass
from bson.objectid import ObjectId


@dataclass
class Test_Mongo():
client:dict = MongoClient()
db:dict = client['test'] # 连接到 test 数据库

def add_one(self):
post = {
'name': 'justin',
'age': 18,
'sex': 'male',
'grade': 80
}
return self.db.students.insert_one(post)

def add_many(self):
return self.db.students.insert_many([{'name':i} for i in range(0,10)])


def get_one(self):
return self.db.students.find_one({'name':'justin'})

def get_more(self):
return self.db.students.find({'sex':'male'})

def get_one_from_oid(self, oid):
obj = ObjectId(oid)
return self.db.students.find_one({'_id': obj})

def get_count(self):
return self.db.students.estimated_document_count()


def update_one(self):
return self.db.students.update_one({'name':'justin'},{'$inc':{'age':10}})

def update_many(self):
return self.db.students.update_many({},{'$inc':{'age':5}})

def delete_one(self):
return self.db.students.delete_one({'name':'justin'})

def delete_many(self):
return self.db.students.delete_many({'name':'justin'})


def main():
obj = Test_Mongo()
print(obj.add_one().inserted_id)
print(obj.get_one())
print(obj.add_many().inserted_ids)
print(obj.get_count())
for item in obj.get_more():
print(item['_id'])
print(obj.get_one_from_oid(obj.add_one().inserted_id))
print(obj.update_one().matched_count)
print(obj.update_one().matched_count)
print(obj.update_many().matched_count)
print(obj.update_many().matched_count)
print(obj.delete_one().deleted_count)
print(obj.delete_many().deleted_count)


if __name__ == '__main__':
main()