pymongo
from pymongo import *
class TestMongo(object):
'''
初始化函数
'''
def __init__(self,ip,port):
self.clinet = MongoClient(ip, port)
self.db=self.clinet["mytest"].plan
pass
def add_one(self):
import time
post={
"title":'标题',
"content":'内容描述',
"creatTime":time.time()
}
res=self.db.insert_one(post)
return res
pass
def select_data(self,where):
'''
查询数据
:param where:
:return:
'''
return self.db.find(where)
pass
def update(self,where,content):
self.db.update(where,{"$set":content})
# print(__name__)
if __name__=="__main__":
obj=TestMongo('192.168.136.134',27017)
obj.add_one()
print('添加成功....')
rs=obj.select_data({})
for item in rs:
print(item)
# 建立连接对象
# try:
# clinet=MongoClient('192.168.136.134',27017)
# #创建数据库
# db=clinet.mytest
# colTable=db.plan
# # 插入一条数据
# # colTable.insert_one({'name':'狄仁杰','age':18,'gender':1})
# # 插入多条数据
# # colTable.insert_many([
# # {'name': '狄仁杰1', 'age': 18, 'gender': 1},
# # {'name': '狄仁杰2', 'age': 18, 'gender': 1},
# # {'name': '狄仁杰3', 'age': 18, 'gender': 1},
# # {'name': '狄仁杰4', 'age': 18, 'gender': 1},
# # {'name': '狄仁杰5', 'age': 18, 'gender': 1},
# # {'name': '狄仁杰6', 'age': 18, 'gender': 1},
# # ])
# # 更新一条
# # colTable.update_one({'name': '狄仁杰1'},{"$set":{'age':20}})
# # //更新多条
# # colTable.update_many({'gender': 1}, {"$set": {'age': 20}})
# # 删除多条
# # colTable.delete_many({'gender': 1})
# # 查询的使用
# # res=colTable.find({"name":"狄仁杰1"})
# res = colTable.find_one({"name": "狄仁杰1"})
# print(res)
# # for item in res:
# # print(item)
# # print('sucess')
# except Exception as ex:
# print(ex)
pymysql
# 安装一个mysql中间模块 pip install pymysql
from pymysql import *
class TestMmysql(object):
'''
初始化函数
'''
def __init__(self,address,user,pwd,database):
res = address.split(":")
self.clinet = conn = connect(host=res[0], user=user, password=pwd, database=database, port=res[1], charset='utf8')
self.sql=conn.cursor() #创建游标
pass
def add_one(self):
import time
post="insert into tb_student(title,content,creaTime) values('标题','内容描述',{})".format(time.time())
self.sql.execute(post)
self.sql.commit()
return "success"
pass
def select_data(self,where):
'''
查询数据
:param where:
:return:
'''
res=sql.execute('select * from {}'.format(where))
sql.fetchall()
return res
pass
def update(self,where,content):
self.sql.execute('update tb_student set content={} where creaTime={}'.format(content,where))
self.sql.commit()
# print(__name__)
if __name__=="__main__":
obj=TestMmysql('192.168.136.134',3389)
try:
obj.add_one()
print('添加成功....')
rs=obj.select_data('tb_student')
for item in rs:
print(item)
except Exception as ex:
print(ex)
finally:
sql.close()
conn.close()
redis
# 安装一个redis中间模块 pip install redis
from redis import *
import time
try:
# 首先创建一个StrictRedis对象,便可以使用这个对象来操作redis数据库了
# 参数 host:redis服务器的ip地址
# port 6379
# password 根据配置的需要 可选的操作
# db:[0-15]
redisObj=StrictRedis(host="192.168.136.134",port=6379,db=1,password='111111')
print("连接redis服务器成功")
redisObj.set('hobby','打篮球') 添加字符串类型的数据
print(redisObj.get('hobby'))
# 添加多个键值对
redisObj.mset({'name1':'AA','name2':'BB','addr':'china.shenzhen'})
rs=redisObj.mget('name1','name2','addr')
print(rs)
# 设定超时时间
redisObj.setex('class',5,'three.class')
print(redisObj.get('class'))
time.sleep(7)
print(redisObj.get('class'))
print(redisObj.keys())
redisObj.delete('name1','name2')
print(redisObj.keys())
# list 列表的使用
redisObj.lpush('class','liyang','weo','fly')
rs=redisObj.rpush('class', 'lifei88888')
redisObj.lrem('class',1,'lifei88888') #删除一个数据
redisObj.lset('class',3,555) #对索引对应的值进行修改
result=redisObj.lrange('class',0,-1)
print(redisObj.llen('class'))
for index in range(3):
print(redisObj.lpop('class')) #返回并删除键为 class 的列表中的首元素
print(result)
redisObj.lpush('AA', '1', 2)
print(redisObj.keys())
redisObj.delete('class')
except Exception as ex:
print(ex)