技术饭
PHP的MongoDB\Driver\Manager实现对mongodb的操作
MongoDB是开发人员数据平台,提供必要的服务和工具,以快速、高性能和大规模地构建用户需求的分布式应用程序。MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。Mongo最大的特点是它支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。
文档:https://www.mongodb.com/docs/
下载:https://www.mongodb.com/try/download/community-kubernetes-operator
管理工具:https://www.mongodb.com/products/compass、https://www.mongodb.com/try/download/database-tools
注意:mongodb安装完之后,通过:mongodb://localhost:27017 连接,如果无法连接,windows下可以在终端管理员执行 net start mongodb 启动服务
教程:
MongoDB 快速入门教程:https://www.bilibili.com/video/BV1wf4y1279C
45分钟快速入门MongoDB:https://www.bilibili.com/video/BV1M3411L7Fi
MongoDB零基础入门详细教程:https://www.bilibili.com/video/BV1fm4y1b7N4
一、基础
1)、概念
//查看版本
mongod --version
//cmd进入mongodb
mongosh // 如果有设置密码需要添加上
//设置账号密码
use admin
db.createuser({user:'test', pwd:'123456', roles: [{role:"root", db:"admin"}]}); //设置密码
mongodb://test:123456@localhost:27017
//显示db
show dbs
//选择db
use mydb
//显示table、collection
show tables
show collections
//创建集合(表)
db.createCollection("students")
//删除集合(表)
db.students.drop()
//删除db
db.dropDatabase()
//写入:insertOne、insertMany
db.products.insertOne({"product_id":200, "product_name":"学习Java", "product_price":1000})
//查询:find、findOne、findOneAndDelete、findOneAndReplace、findOneAndUpdate
db.products.find({"product_id":200},{"product_id":1}) // 第一个参数是条件,第二个参数是查询出来的字段
db.products.find({"_id":ObjectId("647b2a868383855d4a05be91")}) // 通过_id查询
db.products.find({"name":"aaa"},{"product_id":1}).count() // count() 统计
db.products.find({"name":"aaa"},{"product_id":1}).limit(2) // limit() 限制条数
db.products.find({"name":"aaa"},{"product_id":1}).sort({"id":-1}) // 排序:1-正序,-1-倒序
db.products.find({"name":{$regex: /phone/in}) // 正则查询
db.products.find({"name":/phone/in) // 正则查询
db.products.find({"product_price":{$gte:1000}}) // 比较查询 $lt、$gt、$gte、$lte
db.products.find({$or:[{"product_price":{$gt:1000}},{"product_price":{$lt:2000}}]}) // $and、$or
db.products.find({"product_price":{$in:[1000,8139]}}) // $all、$in
db.pro.find().sort({"monthly_price":-1}).skip(0).limit(5) // skip+limit 分页查询
//聚合查询:aggregate
db.products.aggregate(
[
{$match:{"product_id":100}}, // $match 对饮 where条件
{$skip:1}, // $skip 对应 offset
{$limit:20}, // $limit 对应 limit
{$sort:{"product_id":-1}}, // $sort 对应 order by
{$look:{ // $look 对应 join
from: "collection", // 关联表
localField:"id", // 主键
foreignField:"product_id", // 外键
as:"student" // 输出名称
}},
{$group:{ // $group 对应 GROUP BY
_id:"$source", // 分组字段
asField:{$sum:"age"} // 统计输出字段值:$sum、$avg、$min、$max、$push、$addToSet、$first、$last
}},
{$unwind:}, // 展开数组
{$project:{ // $project 对应 字段 as
"名称":'$product_name',
'价格':'$product_price'
}}
]
)
//aggregate实例
db.products.aggregate([{$group:{_id:"$product_id",num_field:{$sum:"$product_price"}}}]) // 分组,字段有个$符号
//map-reduce:计算模型
db.collection.mapReduce(
function() {
emit(key,valqe);
}, # map 函数
function(key,values){
return reduceFunction;
}, # reduce 函数
{
out:collection,
query: document,
sort: document,
limit: number
}
)
//第一个函数表示map,第二个函数表示reduce,第三个参数 query:表示查询条件 out:表示输入结果
db.collection.mapReduce(
function() {
emit(this.name, this.score);
},
function(key,values) {
return Array.sum(values);
},
{
query:{Leve1:"A"},
out:"total_score"
}
)
//更新:updateOne、updateMany
db.collection.updateOne(query,update,{upsert:false,multi:false,writeConcern:false});
upsert: 可选,这个参数的意思是,如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入
multi: 可选,mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新
writeConcern: 可选,抛出异常的级别
//批量写入、更新操作:db.collection.bulkwrite
db.collection.bulkwrite(
[
insertOne:{
"document":{}
},
insertMany:[{},{}],
updateOne:{
"filter":{},
"update":{$set:{}}
},
updateMany:[{},{}]],
deleteOne:{},
deleteMany:[{},{}],
replaceOne:{}
],
{
writeConcern: {}, // 可选,省略则使用默认的 write concern
ordered: true // 可选,表示mongod实例有序还是无序执行操作。默认值true
}
)
db.products.updateOne({"product_id":200},{$set: {"product_name":"学习Java基础"}}) // $set
db.products.updateOne({"product_id":123},{$inc: {"product_price":-10}}) // $inc
db.products.updateOne({"product_id":123},{$push: {"product_name":2}}) // $push 数组添加
//删除:deleteMany、deleteOne
db.products.deleteOne({"_id":"647b2a868383855d4a05be91"})
db.products.updateOne({"product_id":123},{$unset: {"product_name":1}}) // $unset 删除字段
db.products.updateOne({"product_id":123},{$pull: {"product_name":2}}) // $pull 数组删除
//导入:https://github.com/ozlerhakan/mongodb-json-files/blob/master/datasets/products.json
mongoimport --host localhost --port 27017 --db mydb --type json ./pro.json (也可以用jsonArray的形式导入)
//导出
mongoexport -h localhost -p 27017 -d mydb --c pro -o pro1.json
mongoexport -h localhost --port 27017 -d mydb -c pro -o pro1.json
//索引
db.products.getIndexes() // 获取索引
db.products.createIndex({"product_id":-1}) // 单字段索引,倒序
db.products.createIndex({"product_id":-1,"product_price":1}) // 复合索引
db.products.createIndex({"info.name":-1}) // 多键索引,数组内部字段
db.products.createIndex({"product_name":"text"},{backgroud:true}) // 全文索引,后台执行
db.products.find({$text:{$search:"PHP"}}) // 全文索引查询
db.products.createIndex({"loc":"2dsphere"}) // 地图、坐标 2dsphere 索引
db.products.createIndex({"product_name":1},{unique:true}) // 唯一索引
db.products.createIndex({"product_name":1},{expireAferSecs:60*60*24}) // 时间索引
db.products.createIndex({"age":1},{partialFilterExpression:{$gt:{"age":10}}}) // 局部索引,年纪索引,但是必须大于10岁
db.products.createIndex({"age":1},{sparse:true) // 稀疏索引,跳过不存在age的文档索引
db.products.dropIndex("product_name_text") // 删除索引,通过名称删除
db.products.dropIndex({"product_id":-1}) // 删除索引,通过索引ID删除
//mongodb复制集
进入一个节点作为主节点、复制集配置
config = [
"_id":"mongodb-replicaset",
"members":[
{"_id":0, "host":"mongoset1:27017"},
{"_id":1, "host":"mongoset2:27018"},
{"_id":2, "host":"mongoset3:27019"
]}
//初始化配置
rs.initiate(config)
//查看复制集配置
rs.config()
//mongodb事务-写入
writeConcern决定一个写操作落在多少个节点上才算成功,writeConcern的取值包括
0: 发起写操作,不关心是否成功
1-n: 集群最大数据节点数: 写操作需要复制到指定节点数才算成功
majority: 写操作需要复制到大多数节点数才算成功
发起写操作的程序将阻塞到写操作到达指定的节点数为止
虽然多于半数的writeConcern都是安全的,但通常只会设置majority,因为这是等待写入延迟时间最短的选择
不要设置writeConcern等于总节点数,因为一但有一个节点故障,所有写操作都将失败
writeConcern虽然会增加写操作的延迟时间,但并不会显著增加群集的压力,因此无论是否等待,写操作最终都会复制到所有的节点上。设置writeConcern只是让写操作等待复制后再返回而已;
应对重要的数据应用{w: majority},普通数据可以应用w:1确保最佳性能;
//mongodb事务-读取
readPreference决定使用哪一个节点来满足正在发起的读请求。可选值包括:
primary:只选择主节点
primaryPerferred:优先选择主节点,如何不可用则选择从节点
seconday:只选择从节点
secondayPerferred:优先选择从节点,如何不可用则选择主节点
nearest:选择最近的节点
db.test.find().readPref("primary") // 主节点读
db.test.find().readPref("secondary") # 从节点读
二、PHP操作类
1)、基础类
MongoDB\Driver\Manager 入口类,负责维护与MongoDB的连接,执行读写和命令
MongoDB\Driver\BulkWrite 收集要发送到服务器的一个或多个写操作
MongoDB\Driver\Query 构造查询对象
MongoDB\Driver\Cursor 封装MongoDB命令或查询的结果
MongoDB\Driver\WriteResult 封装执行结果
MongoDB\Driver\Exception\BulkWriteException 写入异常
MongoDB\BSON\ObjectId 生成_id
2)、代码
<?php
use MongoDB\Driver\Manager;
use MongoDB\Driver\BulkWrite;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Driver\Query;
use MongoDB\BSON\ObjectId;
//表
const TABLE = "mydb.products";
/**
* 输出打印
* @param $data
* @return void
*/
function P($data = []){
if(is_array($data) || is_object($data)){
echo "<pre>";
print_r($data);
echo "</pre>\n";
} else {
echo $data, PHP_EOL;
}
}
/**
* 写入
* @return void
*/
function create(){
// 连接mongodb
$manager = new Manager("mongodb://localhost:27017");
//bulk实例
$bulk = new BulkWrite();
$bulk->insert(['product_id' => 123, 'product_name' => 'Gregory 格里高利登山包B75蓝色新款', 'product_price' => 2139.00]);
$bulk->insert(['product_id' => 124, 'product_name' => 'Osprey 小鹰登山包Aether AG苍穹 70L蓝色', 'product_price' => 2099.00]);
$bulk->insert(['product_id' => 125, 'product_name' => 'GRANITE GEAR 花岗岩登山包光环追踪85L红色', 'product_price' => 1800]);
$bulk->insert(['product_id' => 126, 'product_name' => 'ARCTERYX 始祖鸟登山包Bora AR 50L婆罗洲蓝', 'product_price' => 4450.00]);
$bulk->insert(['product_id' => 128, 'product_name' => 'PHP 是世界上最棒的语言', 'product_price' => 8139.00]);
try {
$result = $manager->executeBulkWrite(TABLE, $bulk);
P($result->getInsertedCount());
} catch (BulkWriteException $e){
P($e->getWriteResult()->getWriteErrors());
}
}
/**
* 删除
* @return void
*/
function delete(){
//连接mongodb
$manager = new Manager("mongodb://localhost:27017");
//bulk实例
$bulk = new BulkWrite();
$bulk->delete(["product_id" => 126]);
try {
$result = $manager->executeBulkWrite(TABLE, $bulk);
P($result->getDeletedCount());
} catch (BulkWriteException $e) {
P($e->getWriteResult()->getWriteErrors());
}
}
/**
* 修改
* @return void
*/
function update(){
//连接mongodb
$manager = new Manager("mongodb://localhost:27017");
//bulk实例
$bulk = new BulkWrite();
$id = new ObjectId("647b2a868383855d4a05be92");
$bulk->update(["_id" => $id], ['$set' => ["product_id" => 100, "product_name" => "GoLang"]]);
try {
$result = $manager->executeBulkWrite(TABLE, $bulk);
P($result->getModifiedCount());
} catch (BulkWriteException $e) {
P($e->getWriteResult()->getWriteErrors());
}
}
/**
* 查询
* @return void
*/
function get(){
//连接mongodb
$manager = new Manager("mongodb://localhost:27017");
//查询实例
$filter = [
"product_id" => ['$gt' => 123]
];
$options = [
"projection" => ["_id" => 0, "product_id" => 1, "product_price" => 1], //0-排除,1-取出
"sort" => ["product_id" => -1] // -1-倒序,1-正序
];
$query = new Query($filter, $options);
$cursor = $manager->executeQuery(TABLE, $query);
foreach ($cursor as $key => $data){
P($data);
}
}
分页查询:https://www.php.net/manual/zh/mongodb-driver-manager.executequery
$cursor = $Manager->executeQuery(TABLE, new Query($query, ["skip" => 1,"limit" => 3, "sort" => array("_id" => -1), 'projection'=> $projection ? array_fill_keys($projection, 1) : []]), $Session->options());
3)、mongodb事务:https://www.cnblogs.com/luoyunfei99/articles/17225827.html
经过查阅,发现当MongoDB为单例运行时是无法完成回滚操作的,只有在开启副本集的时候才能使用。
解决方案
Mongodb配置文件
replication:
oplogSizeMB: 2000
replSetName: rs0
enableMajorityReadConcern: true
Mongodb集群初始化
连接进入mongo,使用rs.initiate()初始化副本集。
public function mongodbtest(){
//连接 mongodb
$manager = new \MongoDB\Driver\Manager('mongodb://172.16.100.191:27017');
//开启session
//causalConsistency boolean 因果一致性 如果操作依赖先前的操作需要设置为true
$session = $manager->startSession(['causalConsistency' => true]);
//开启事务
$session->startTransaction();
//下面执行两个写操作
//批量插入/删除/更新操作
//ordered 有序操作(TRUE)在MongoDB服务器上串行执行,而无序操作(FALSE)以任意顺序发送到服务器,并且可以并行执行。
$bulk1 = new \MongoDB\Driver\BulkWrite(['ordered' => true]);
$bulk1->insert(['name' => 'insert', 'age' => 44]);
$writeConcern1 = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);
//执行批量插入/删除/更新操作
//第一个参数 databaseName.collectionName
//第二个参数 要执行的操作
$manager->executeBulkWrite('huaxiang.user', $bulk1, ['session' => $session, 'writeConcern' => $writeConcern1]);
$bulk2 = new \MongoDB\Driver\BulkWrite(['ordered' => true]);
$bulk2->insert(['id' => 2]);
$writeConcern2 = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);
$manager->executeBulkWrite('huaxiang.coin', $bulk2, ['session' => $session, 'writeConcern' => $writeConcern2]);
//提交事务
//$session->commitTransaction();
//事务回滚
$session->abortTransaction();
}
参考:
MongoDB\Driver\Manager::executeQuery:https://www.php.net/manual/zh/mongodb-driver-manager.executequery
PHP7使用MongoDB\Driver\Manager类实现增删改查:https://blog.csdn.net/weixin_43910923/article/details/89606752
文明上网理性发言!