Python连接各类数据库
1.连接Oracle数据库
安装cx_Oracle模块 pip install cx_Oracle
import cx_Oracle import os import json os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #保证select出来的中文显示没有问题 class Oracle(object): def __init__(self): # self.connect = cx_Oracle.connect(user + "/" + pwd + "@" + ip + ":" + port + "/" + sid) self.connect = cx_Oracle.connect('user /pwd @127.0.0.1:1521/orcl') # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库实例名 self.cursor = self.connect.cursor() # 使用cursor()方法获取操作游标 def GetData(self,sql): self.cursor.execute(sql) # 使用Rowfactory更改查询结果,更直观查看数据 columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) # fetchall()一次取完所有结果 # fetchone()一次取一行结果 data = self.cursor.fetchall() return data def select(self, sql): #查询 list = [] self.cursor.execute(sql) # 使用execute方法执行SQL语句 result = self.cursor.fetchall() # fetchall()一次取完所有结果,fetchone()一次取一行结果 col_name = self.cursor.description for row in result: dict = {} for col in range(len(col_name)): key = col_name[col][0] value = row[col] dict[key] = value list.append(dict) js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':')) #json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串。 #json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关。 return js #将结果返回为一个字符串 def selectlist(self,sql): list = [] self.cursor.execute(sql) result = self.cursor.fetchall() col_name = self.cursor.description for row in result: dict = {} for col in range(len(col_name)): key = col_name[col][0] value = row[col] dict[key] = value list.append(dict) return list #将结果以列表返回,若需要获取数据,for循环下 def disconnect(self): #未连接 self.cursor.close() self.connect.close() def insert(self, sql, list_param): #插入 try: self.cursor.executemany(sql, list_param) self.connect.commit() print("插入ok") except Exception as e: print(e) finally: self.disconnect() def update(self, sql): #更新 try: self.cursor.execute(sql) self.connect.commit() except Exception as e: print(e) finally: self.disconnect() def delete(self, sql): #删除 try: self.cursor.execute(sql) self.connect.commit() print("delete ok") except Exception as e: print(e) finally: self.disconnect() if __name__ == '__main__': pass
2.连接Mysql数据库
安装pymysql模块 pip install pymysql
import pymysql class Mysql(object): def __init__(self, host, user, password, port): #初始化, 构造函数 self.db = pymysql.connect(host=host, user=user, password=password,port=port, charset='utf8') self.cursor = self.db.cursor() def exec_data(self, sql, data=None): # 将要插入的数据写成元组传入 try: self.cursor.execute(sql, data) # 执行SQL语句 self.db.commit() # 提交到数据库执行 except Exception as error: print(error) def exec(self, sql): # sql拼接时使用repr(),将字符串原样输出 try: self.cursor.execute(sql) self.db.commit() except Exception as error: print(error) def select(self,sql): try: self.cursor.execute(sql) desc = self.cursor.description # 获取字段的描述,默认获取数据库字段名称,重新定义时通过AS关键重新命名即可 for data_dict in [dict(zip([col[0] for col in desc], row)) for row in self.cursor.fetchall()]: #将表字段与表数据以字典输出 return data_dict except Exception as error: print(error) def fetchall(self, sql): # 查询所有数据 try: self.cursor.execute(sql) return self.cursor.fetchall() except Exception as error: print(error) def fetchmany(self, sql, size=1): # 查询多条数据 try: self.cursor.execute(sql) return self.cursor.fetchmany(size) except Exception as error: print(error) def __del__(self): self.cursor.close() self.db.close() if __name__ == '__main__': pass
3.连接Mongo数据库
安装pymongo模块 pip install pymongo
# -*- coding: UTF-8 -*- from pprint import pprint from pymongo import MongoClient class MongoDB(object): def Mongodbconn(self, host, database, table): # host数据库地址,database数据库,table表(或者集合) global client client = MongoClient(host, 27017) # 建立Mongodb数据库连接,27017为数据库端口 self.db = client[database] # 等同于在linux命令中切换到对应的库 self.collection = self.db[f'{table}'] # table为集合,相当于表名 def Mongodbfind(self, host, database, table,field,value): # 查询函数,field是表字段,value为表某一字段的值 self.Mongodbconn(host, database, table) # 调用Mongodbconn函数 # onedata = self.collection.find_one({}) # 查找集合(表)中第一条数据 res = self.collection.find_one({f'{field}': f"{value}"}) #查询表中某个字段值对应的所有信息,将字段值变量格式化 pprint(res) def Mongodbfound(self, host, database, table,field,value): # 查找某个字段的所有值 self.Mongodbconn(host, database, table) for item in self.collection.find({f'{field}':{"$in":[f"{value}"]}}): print(item) def Mongodbupdate(self, host, database, table): # 更新某字段所有数据 self.Mongodbconn(host, database, table) ''' multi: 布尔类型, 设置数据更新时是否一次性更新多条数据, 默认为False upsert: 设置数据更新时,如果数据不存在,是否将本次数据添加到文件中,默认为False ''' self.collection.update({"iss_bgn_dt": "1"}, {'$set': {"iss_bgn_dt": "2"}}, multi=True,upsert=True) # 更新集合(表)里iss_bgn_dt的数据,由原始值1变为后来值2 def Mongodbinsert(self, host, database, table,sql): # 向库中某集合(表)插入数据 self.Mongodbconn(host, database, table) ''' insert():可以实现单条或多条数据的插入 save():只能完成单条数据的插入,并且数据必须是字典结构 ''' #sql是需要插入的语句 self.collection.insert(sql) def Mongodbremove(self, host, database, table): # 删除库中某集合(表)数据 self.Mongodbconn(host, database, table) ''' delete_one(): 删除数据中一条数据 delete_many(): 一次性删除多条满足的数据 ''' self.collection.delete_many({'iss_bgn_dt': '20290626'}) # 删除集合collection中的所有iss_bgn_dt等于20190626的数据 if __name__ == '__main__': pass #a = MongoDB() #a.Mongodbfind(host='库地址',database='库名',table ='集合(表名)',ACCT_CODE='表中字段值') #ACCT_CODE表字段 #a.Mongodbupdate(host='库地址',database='库名',table ='集合(表名)') #a.Mongodbinsert(host='库地址',database='库名',table ='集合(表名)') #a.Mongodbremove(host='库地址',database='库名',table ='集合(表名)')