Python连接各类数据库
1.连接Oracle数据库
安装cx_Oracle模块 pip install cx_Oracle
import cx_Oracle
import os
import json
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #保证select出来的中文显示没有问题
class Oracle(object):
def __init__(self):
# self.connect = cx_Oracle.connect(user + "/" + pwd + "@" + ip + ":" + port + "/" + sid)
self.connect = cx_Oracle.connect('user /pwd @127.0.0.1:1521/orcl')
# 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库实例名
self.cursor = self.connect.cursor() # 使用cursor()方法获取操作游标
def GetData(self,sql):
self.cursor.execute(sql)
# 使用Rowfactory更改查询结果,更直观查看数据
columns = [col[0] for col in self.cursor.description]
self.cursor.rowfactory = lambda *args: dict(zip(columns, args))
# fetchall()一次取完所有结果
# fetchone()一次取一行结果
data = self.cursor.fetchall()
return data
def select(self, sql): #查询
list = []
self.cursor.execute(sql) # 使用execute方法执行SQL语句
result = self.cursor.fetchall() # fetchall()一次取完所有结果,fetchone()一次取一行结果
col_name = self.cursor.description
for row in result:
dict = {}
for col in range(len(col_name)):
key = col_name[col][0]
value = row[col]
dict[key] = value
list.append(dict)
js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))
#json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串。
#json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关。
return js #将结果返回为一个字符串
def selectlist(self,sql):
list = []
self.cursor.execute(sql)
result = self.cursor.fetchall()
col_name = self.cursor.description
for row in result:
dict = {}
for col in range(len(col_name)):
key = col_name[col][0]
value = row[col]
dict[key] = value
list.append(dict)
return list #将结果以列表返回,若需要获取数据,for循环下
def disconnect(self): #未连接
self.cursor.close()
self.connect.close()
def insert(self, sql, list_param): #插入
try:
self.cursor.executemany(sql, list_param)
self.connect.commit()
print("插入ok")
except Exception as e:
print(e)
finally:
self.disconnect()
def update(self, sql): #更新
try:
self.cursor.execute(sql)
self.connect.commit()
except Exception as e:
print(e)
finally:
self.disconnect()
def delete(self, sql): #删除
try:
self.cursor.execute(sql)
self.connect.commit()
print("delete ok")
except Exception as e:
print(e)
finally:
self.disconnect()
if __name__ == '__main__':
pass
2.连接Mysql数据库
安装pymysql模块 pip install pymysql
import pymysql
class Mysql(object):
def __init__(self, host, user, password, port): #初始化, 构造函数
self.db = pymysql.connect(host=host, user=user, password=password,port=port, charset='utf8')
self.cursor = self.db.cursor()
def exec_data(self, sql, data=None): # 将要插入的数据写成元组传入
try:
self.cursor.execute(sql, data) # 执行SQL语句
self.db.commit() # 提交到数据库执行
except Exception as error:
print(error)
def exec(self, sql): # sql拼接时使用repr(),将字符串原样输出
try:
self.cursor.execute(sql)
self.db.commit()
except Exception as error:
print(error)
def select(self,sql):
try:
self.cursor.execute(sql)
desc = self.cursor.description # 获取字段的描述,默认获取数据库字段名称,重新定义时通过AS关键重新命名即可
for data_dict in [dict(zip([col[0] for col in desc], row)) for row in self.cursor.fetchall()]: #将表字段与表数据以字典输出
return data_dict
except Exception as error:
print(error)
def fetchall(self, sql): # 查询所有数据
try:
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as error:
print(error)
def fetchmany(self, sql, size=1): # 查询多条数据
try:
self.cursor.execute(sql)
return self.cursor.fetchmany(size)
except Exception as error:
print(error)
def __del__(self):
self.cursor.close()
self.db.close()
if __name__ == '__main__':
pass
3.连接Mongo数据库
安装pymongo模块 pip install pymongo
# -*- coding: UTF-8 -*-
from pprint import pprint
from pymongo import MongoClient
class MongoDB(object):
def Mongodbconn(self, host, database, table): # host数据库地址,database数据库,table表(或者集合)
global client
client = MongoClient(host, 27017) # 建立Mongodb数据库连接,27017为数据库端口
self.db = client[database] # 等同于在linux命令中切换到对应的库
self.collection = self.db[f'{table}'] # table为集合,相当于表名
def Mongodbfind(self, host, database, table,field,value): # 查询函数,field是表字段,value为表某一字段的值
self.Mongodbconn(host, database, table) # 调用Mongodbconn函数
# onedata = self.collection.find_one({}) # 查找集合(表)中第一条数据
res = self.collection.find_one({f'{field}': f"{value}"}) #查询表中某个字段值对应的所有信息,将字段值变量格式化
pprint(res)
def Mongodbfound(self, host, database, table,field,value): # 查找某个字段的所有值
self.Mongodbconn(host, database, table)
for item in self.collection.find({f'{field}':{"$in":[f"{value}"]}}):
print(item)
def Mongodbupdate(self, host, database, table): # 更新某字段所有数据
self.Mongodbconn(host, database, table)
'''
multi: 布尔类型, 设置数据更新时是否一次性更新多条数据, 默认为False
upsert: 设置数据更新时,如果数据不存在,是否将本次数据添加到文件中,默认为False
'''
self.collection.update({"iss_bgn_dt": "1"}, {'$set': {"iss_bgn_dt": "2"}}, multi=True,upsert=True)
# 更新集合(表)里iss_bgn_dt的数据,由原始值1变为后来值2
def Mongodbinsert(self, host, database, table,sql): # 向库中某集合(表)插入数据
self.Mongodbconn(host, database, table)
'''
insert():可以实现单条或多条数据的插入
save():只能完成单条数据的插入,并且数据必须是字典结构
'''
#sql是需要插入的语句
self.collection.insert(sql)
def Mongodbremove(self, host, database, table): # 删除库中某集合(表)数据
self.Mongodbconn(host, database, table)
'''
delete_one(): 删除数据中一条数据
delete_many(): 一次性删除多条满足的数据
'''
self.collection.delete_many({'iss_bgn_dt': '20290626'})
# 删除集合collection中的所有iss_bgn_dt等于20190626的数据
if __name__ == '__main__':
pass
#a = MongoDB()
#a.Mongodbfind(host='库地址',database='库名',table ='集合(表名)',ACCT_CODE='表中字段值') #ACCT_CODE表字段
#a.Mongodbupdate(host='库地址',database='库名',table ='集合(表名)')
#a.Mongodbinsert(host='库地址',database='库名',table ='集合(表名)')
#a.Mongodbremove(host='库地址',database='库名',table ='集合(表名)')
