技术饭

PHP的MongoDB\Driver\Manager实现对mongodb的操作

copylian    0 评论    19325 浏览    2023.06.01

MongoDB是开发人员数据平台,提供必要的服务和工具,以快速、高性能和大规模地构建用户需求的分布式应用程序。MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。Mongo最大的特点是它支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引。

官网:https://www.mongodb.com/

文档:https://www.mongodb.com/docs/

下载https://www.mongodb.com/try/download/community-kubernetes-operator

管理工具:https://www.mongodb.com/products/compasshttps://www.mongodb.com/try/download/database-tools

注意:mongodb安装完之后,通过:mongodb://localhost:27017 连接,如果无法连接,windows下可以在终端管理员执行 net start mongodb 启动服务

教程:

        MongoDB 快速入门教程:https://www.bilibili.com/video/BV1wf4y1279C

        45分钟快速入门MongoDB:https://www.bilibili.com/video/BV1M3411L7Fi

        MongoDB零基础入门详细教程:https://www.bilibili.com/video/BV1fm4y1b7N4

一、基础

1)、概念

微信图片_20230604000352.png

//查看版本

mongod --version


//cmd进入mongodb

mongosh // 如果有设置密码需要添加上


//设置账号密码

use admin

db.createuser({user:'test', pwd:'123456', roles: [{role:"root", db:"admin"}]}); //设置密码

mongodb://test:123456@localhost:27017


//显示db

show dbs


//选择db

use mydb


//显示table、collection

show tables

show collections


//创建集合(表)

db.createCollection("students")


//删除集合(表)

db.students.drop()


//删除db

db.dropDatabase()


//写入:insertOneinsertMany

db.products.insertOne({"product_id":200, "product_name":"学习Java", "product_price":1000})


//查询:find、findOne、findOneAndDelete、findOneAndReplace、findOneAndUpdate

db.products.find({"product_id":200},{"product_id":1}) // 第一个参数是条件,第二个参数是查询出来的字段

db.products.find({"_id":ObjectId("647b2a868383855d4a05be91")}) // 通过_id查询

db.products.find({"name":"aaa"},{"product_id":1}).count() // count() 统计

db.products.find({"name":"aaa"},{"product_id":1}).limit(2) // limit() 限制条数

db.products.find({"name":"aaa"},{"product_id":1}).sort({"id":-1}) // 排序:1-正序,-1-倒序

db.products.find({"name":{$regex: /phone/in}) // 正则查询

db.products.find({"name":/phone/in) // 正则查询

db.products.find({"product_price":{$gte:1000}}) // 比较查询 $lt、$gt、$gte、$lte

db.products.find({$or:[{"product_price":{$gt:1000}},{"product_price":{$lt:2000}}]}) // $and、$or

db.products.find({"product_price":{$in:[1000,8139]}}) // $all、$in

db.pro.find().sort({"monthly_price":-1}).skip(0).limit(5) // skip+limit 分页查询


//聚合查询:aggregate

db.products.aggregate(

  [

    {$match:{"product_id":100}}, // $match 对饮 where条件

    {$skip:1}, // $skip 对应 offset

    {$limit:20}, // $limit 对应 limit

    {$sort:{"product_id":-1}}, // $sort 对应 order by

    {$look:{  // $look 对应 join

          from: "collection", // 关联表

          localField:"id", // 主键

          foreignField:"product_id", // 外键

          as:"student" // 输出名称

    }},

    {$group:{ // $group 对应 GROUP BY

      _id:"$source", // 分组字段

      asField:{$sum:"age"} // 统计输出字段值:$sum、$avg、$min、$max、$push、$addToSet、$first、$last

    }},

    {$unwind:}, // 展开数组

    {$project:{  // $project 对应 字段 as

      "名称":'$product_name',

      '价格':'$product_price'

    }}

  ]

)


//aggregate实例

db.products.aggregate([{$group:{_id:"$product_id",num_field:{$sum:"$product_price"}}}]) // 分组,字段有个$符号

微信图片_20230604114604.png


//map-reduce:计算模型

db.collection.mapReduce(

  function() {

    emit(key,valqe);

  }, # map 函数

  function(key,values){

    return reduceFunction;

  }, # reduce 函数

  {

    out:collection,

    query: document,

    sort: document,

    limit: number

  }

)


//第一个函数表示map,第二个函数表示reduce,第三个参数 query:表示查询条件 out:表示输入结果

db.collection.mapReduce(

  function() {

    emit(this.name, this.score);

  },

  function(key,values) {

    return Array.sum(values);

  },

  {

    query:{Leve1:"A"},

    out:"total_score"

  }

)


//更新:updateOne、updateMany

db.collection.updateOne(query,update,{upsert:false,multi:false,writeConcern:false});

upsert: 可选,这个参数的意思是,如果不存在update的记录,是否插入objNew,true为插入,默认是false,不插入

multi: 可选,mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新

writeConcern: 可选,抛出异常的级别


//批量写入、更新操作:db.collection.bulkwrite

db.collection.bulkwrite(

  [

    insertOne:{

      "document":{}

    },

    insertMany:[{},{}],

    updateOne:{

      "filter":{},

      "update":{$set:{}}

    },

    updateMany:[{},{}]],

    deleteOne:{},

    deleteMany:[{},{}],

    replaceOne:{}

  ],

  {

    writeConcern: {}, // 可选,省略则使用默认的 write concern

    ordered: true // 可选,表示mongod实例有序还是无序执行操作。默认值true

  }

)


db.products.updateOne({"product_id":200},{$set: {"product_name":"学习Java基础"}}) // $set

db.products.updateOne({"product_id":123},{$inc: {"product_price":-10}}) // $inc

db.products.updateOne({"product_id":123},{$push: {"product_name":2}}) // $push 数组添加


//删除:deleteMany、deleteOne

db.products.deleteOne({"_id":"647b2a868383855d4a05be91"})

db.products.updateOne({"product_id":123},{$unset: {"product_name":1}}) // $unset 删除字段

db.products.updateOne({"product_id":123},{$pull: {"product_name":2}}) // $pull 数组删除


//导入:https://github.com/ozlerhakan/mongodb-json-files/blob/master/datasets/products.json

mongoimport --host localhost --port 27017 --db mydb --type json ./pro.json (也可以用jsonArray的形式导入)


//导出

mongoexport -h localhost -p 27017 -d mydb --c pro -o pro1.json

mongoexport -h localhost --port 27017 -d mydb -c pro -o pro1.json


//索引

db.products.getIndexes() // 获取索引

db.products.createIndex({"product_id":-1}) // 单字段索引,倒序

db.products.createIndex({"product_id":-1,"product_price":1}) // 复合索引

db.products.createIndex({"info.name":-1}) // 多键索引,数组内部字段

db.products.createIndex({"product_name":"text"},{backgroud:true}) // 全文索引,后台执行

db.products.find({$text:{$search:"PHP"}}) // 全文索引查询

db.products.createIndex({"loc":"2dsphere"}) // 地图、坐标 2dsphere 索引

db.products.createIndex({"product_name":1},{unique:true}) // 唯一索引

db.products.createIndex({"product_name":1},{expireAferSecs:60*60*24}) // 时间索引

db.products.createIndex({"age":1},{partialFilterExpression:{$gt:{"age":10}}}) // 局部索引,年纪索引,但是必须大于10岁

db.products.createIndex({"age":1},{sparse:true) // 稀疏索引,跳过不存在age的文档索引

db.products.dropIndex("product_name_text") // 删除索引,通过名称删除

db.products.dropIndex({"product_id":-1}) // 删除索引,通过索引ID删除


//mongodb复制集

进入一个节点作为主节点、复制集配置

config = [

 "_id":"mongodb-replicaset",

  "members":[

    {"_id":0, "host":"mongoset1:27017"},

    {"_id":1, "host":"mongoset2:27018"},

    {"_id":2, "host":"mongoset3:27019"

  ]}


//初始化配置

rs.initiate(config)


//查看复制集配置

rs.config()


//mongodb事务-写入

writeConcern决定一个写操作落在多少个节点上才算成功,writeConcern的取值包括

        0: 发起写操作,不关心是否成功

        1-n: 集群最大数据节点数: 写操作需要复制到指定节点数才算成功

        majority: 写操作需要复制到大多数节点数才算成功

        发起写操作的程序将阻塞到写操作到达指定的节点数为止

微信图片_20230604134200.png


虽然多于半数的writeConcern都是安全的,但通常只会设置majority,因为这是等待写入延迟时间最短的选择

不要设置writeConcern等于总节点数,因为一但有一个节点故障,所有写操作都将失败

writeConcern虽然会增加写操作的延迟时间,但并不会显著增加群集的压力,因此无论是否等待,写操作最终都会复制到所有的节点上。设置writeConcern只是让写操作等待复制后再返回而已;

应对重要的数据应用{w: majority},普通数据可以应用w:1确保最佳性能;


//mongodb事务-读取

readPreference决定使用哪一个节点来满足正在发起的读请求。可选值包括:

        primary:只选择主节点

        primaryPerferred:优先选择主节点,如何不可用则选择从节点

        seconday:只选择从节点

        secondayPerferred:优先选择从节点,如何不可用则选择主节点

        nearest:选择最近的节点

微信图片_20230604134447.png

db.test.find().readPref("primary") // 主节点读

db.test.find().readPref("secondary") # 从节点读


二、PHP操作类

1)、基础类

MongoDB\Driver\Manager 入口类,负责维护与MongoDB的连接,执行读写和命令

MongoDB\Driver\BulkWrite 收集要发送到服务器的一个或多个写操作

MongoDB\Driver\Query 构造查询对象

MongoDB\Driver\Cursor 封装MongoDB命令或查询的结果

MongoDB\Driver\WriteResult 封装执行结果

MongoDB\Driver\Exception\BulkWriteException 写入异常

MongoDB\BSON\ObjectId 生成_id


2)、代码

<?php

use MongoDB\Driver\Manager;

use MongoDB\Driver\BulkWrite;

use MongoDB\Driver\Exception\BulkWriteException;

use MongoDB\Driver\Query;

use MongoDB\BSON\ObjectId;


//表

const TABLE = "mydb.products";

/**

* 输出打印

* @param $data

* @return void

*/

function P($data = []){

        if(is_array($data) || is_object($data)){

                echo "<pre>";

                print_r($data);

                echo "</pre>\n";

        } else {

                echo $data, PHP_EOL;

        }

}


/**

* 写入

* @return void

*/

function create(){

        // 连接mongodb

        $manager = new Manager("mongodb://localhost:27017");


        //bulk实例

        $bulk = new BulkWrite();

        $bulk->insert(['product_id' => 123, 'product_name' => 'Gregory 格里高利登山包B75蓝色新款', 'product_price' => 2139.00]);

        $bulk->insert(['product_id' => 124, 'product_name' => 'Osprey 小鹰登山包Aether AG苍穹 70L蓝色', 'product_price' => 2099.00]);

        $bulk->insert(['product_id' => 125, 'product_name' => 'GRANITE GEAR 花岗岩登山包光环追踪85L红色', 'product_price' => 1800]);

        $bulk->insert(['product_id' => 126, 'product_name' => 'ARCTERYX 始祖鸟登山包Bora AR 50L婆罗洲蓝', 'product_price' => 4450.00]);

        $bulk->insert(['product_id' => 128, 'product_name' => 'PHP 是世界上最棒的语言', 'product_price' => 8139.00]);

        try {

                $result = $manager->executeBulkWrite(TABLE, $bulk);

                P($result->getInsertedCount());

        } catch (BulkWriteException $e){

                P($e->getWriteResult()->getWriteErrors());

        }

}


/**

* 删除

* @return void

*/

function delete(){

        //连接mongodb

        $manager = new Manager("mongodb://localhost:27017");


        //bulk实例

        $bulk = new BulkWrite();

        $bulk->delete(["product_id" => 126]);

        try {

                $result = $manager->executeBulkWrite(TABLE, $bulk);

                P($result->getDeletedCount());

        } catch (BulkWriteException $e) {

                P($e->getWriteResult()->getWriteErrors());

        }

}


/**

* 修改

* @return void

*/

function update(){

        //连接mongodb

        $manager = new Manager("mongodb://localhost:27017");


        //bulk实例

        $bulk = new BulkWrite();

        $id = new ObjectId("647b2a868383855d4a05be92");

        $bulk->update(["_id" => $id], ['$set' => ["product_id" => 100, "product_name" => "GoLang"]]);

        try {

                $result = $manager->executeBulkWrite(TABLE, $bulk);

                P($result->getModifiedCount());

        } catch (BulkWriteException $e) {

                P($e->getWriteResult()->getWriteErrors());

        }

}


/**

* 查询

* @return void

*/

function get(){

        //连接mongodb

        $manager = new Manager("mongodb://localhost:27017");


        //查询实例

        $filter = [

                "product_id" => ['$gt' => 123]

        ];

        $options = [

                "projection" => ["_id" => 0, "product_id" => 1, "product_price" => 1], //0-排除,1-取出

                "sort" => ["product_id" => -1] // -1-倒序,1-正序

        ];

        $query = new Query($filter, $options);

        $cursor = $manager->executeQuery(TABLE, $query);

        foreach ($cursor as $key => $data){

                P($data);

        }

}


分页查询:https://www.php.net/manual/zh/mongodb-driver-manager.executequery

$cursor = $Manager->executeQuery(TABLE, new Query($query, ["skip" => 1,"limit" => 3, "sort" => array("_id" => -1), 'projection'=> $projection ? array_fill_keys($projection, 1) : []]), $Session->options());


3)、mongodb事务:https://www.cnblogs.com/luoyunfei99/articles/17225827.html

经过查阅,发现当MongoDB为单例运行时是无法完成回滚操作的,只有在开启副本集的时候才能使用。

解决方案

Mongodb配置文件

        replication:

                oplogSizeMB: 2000

                replSetName: rs0

                enableMajorityReadConcern: true

Mongodb集群初始化

连接进入mongo,使用rs.initiate()初始化副本集。


public function mongodbtest(){

        //连接 mongodb

        $manager = new \MongoDB\Driver\Manager('mongodb://172.16.100.191:27017');


        //开启session

        //causalConsistency  boolean  因果一致性 如果操作依赖先前的操作需要设置为true

        $session = $manager->startSession(['causalConsistency' => true]);


        //开启事务

        $session->startTransaction();


        //下面执行两个写操作

        //批量插入/删除/更新操作

        //ordered  有序操作(TRUE)在MongoDB服务器上串行执行,而无序操作(FALSE)以任意顺序发送到服务器,并且可以并行执行。

        $bulk1 = new \MongoDB\Driver\BulkWrite(['ordered' => true]);

        $bulk1->insert(['name' => 'insert', 'age' => 44]);

        $writeConcern1 = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);


        //执行批量插入/删除/更新操作

        //第一个参数 databaseName.collectionName

        //第二个参数 要执行的操作

        $manager->executeBulkWrite('huaxiang.user', $bulk1, ['session' => $session, 'writeConcern' => $writeConcern1]);


        $bulk2 = new \MongoDB\Driver\BulkWrite(['ordered' => true]);

        $bulk2->insert(['id' => 2]);

        $writeConcern2 = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);


        $manager->executeBulkWrite('huaxiang.coin', $bulk2, ['session' => $session, 'writeConcern' => $writeConcern2]);


        //提交事务

        //$session->commitTransaction();

        //事务回滚

        $session->abortTransaction();

}


参考:

MongoDB\Driver\Manager::executeQuery:https://www.php.net/manual/zh/mongodb-driver-manager.executequery

PHP7使用MongoDB\Driver\Manager类实现增删改查:https://blog.csdn.net/weixin_43910923/article/details/89606752

只袄早~~~
感谢你的支持,我会继续努力!
扫码打赏,感谢您的支持!

文明上网理性发言!

  • 还没有评论,沙发等你来抢