ldap3 官方文档学习之增删改查操作
前言
公司部门培训用到 ldap3,布置了个作业,于是开始看官方文档学习中。我是直接从 LDAP Operations 部分开始看的。
主要就是官方文档提供了增删改查的接口,需要看懂函数和参数,然后就会用了。
增加操作
官方 add 函数
def add(self, dn, object_class=None, attributes=None, controls=None)
逐个参数解释:
dn:标识要添加的目标名字
object_class:要添加的标志类名称,可以是包含一个单一值或一串字符串
attributes:一个以 {‘attr1’: ‘val1’, ‘attr2’: ‘val2’, …} or {‘attr1’: [‘val1’, ‘val2’, …], …} 多值形式的字典
controls:发送请求额外的信息
举例
# import class and constants from ldap3 import Server, Connection, ALL # define the server s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema # define the connection c = Connection(s, user='user_dn', password='user_password') # perform the Add operation c.add('cn=user1,ou=users,o=company', ['inetOrgPerson', 'posixGroup', 'top'], {'sn': 'user_sn', 'gidNumber': 0}) # equivalent to 等同上面 c.add('cn=user1,ou=users,o=company', attributes={'objectClass': ['inetOrgPerson', 'posixGroup', 'top'], 'sn': 'user_sn', gidNumber: 0}) print(c.result) # close the connection c.unbind()
主要就是 add 函数传三个参数:dn、object_class、attributes。
- dn 包含用户cn、ou、o等信息
删除操作
官方 delete 函数
def delete(self, dn, controls=None):
逐个参数解释:
- dn:标识要删除的目标名字
- controls:发送请求额外的信息
举例
from ldap3 import Server, Connection, ALL # define the server s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema # define the connection c = Connection(s, user='user_dn', password='user_password') # perform the Delete operation c.delete('cn=user1,ou=users,o=company') print(c.result) # close the connection c.unbind()
主要就是 delete 函数传一个参数:dn。
- dn 包含用户cn、ou、o等信息
修改操作
官方 modify 函数
def modify(self, dn, changes, controls=None):
逐个参数解释:
- dn:标识要删除的目标名字
- changes:一个要被展示在具体入口的修改的字典
- controls:发送请求额外的信息
举例
# import class and constants from ldap3 import Server, Connection, ALL, MODIFY_REPLACE # define the server s = Server('servername', get_info=ALL) # define an unsecure LDAP server, requesting info on DSE and schema # define the connection c = Connection(s, user='user_dn', password='user_password') c.bind() # perform the Modify operation c.modify('cn=user1,ou=users,o=company', {'givenName': [(MODIFY_REPLACE, ['givenname-1-replaced'])], 'sn': [(MODIFY_REPLACE, ['sn-replaced'])]}) print(c.result) # close the connection c.unbind()
主要就是 modify 函数传两个参数:dn、changes。
- dn 包含用户cn、ou、o等信息
- changes 包含修改前的数据类别和修改后的数据
查询操作
官方 search 函数
def search(self, search_base, search_filter, search_scope=SUBTREE, dereference_aliases=DEREF_ALWAYS, attributes=None, size_limit=0, time_limit=0, types_only=False, get_operational_attributes=False, controls=None, paged_size=None, paged_criticality=False, paged_cookie=None):
逐个参数解释:
- search_base:查询请求的基础
- search_filter:查询请求的过滤器,必须服从 LDAP 过滤语法 RFC4515 标准
- search_scope:具体指定查询内容的部分
- BASE:查询 search_base 中指定的条目的属性。
- LEVEL:查询 search_base 中包含的条目的属性。基对象必须引用一个容器对象。
- SUBTREE:向下查询 search_base 和所有附属容器中指定的条目的属性。
- attributes:查询返回的单个属性或属性列表(默认为None)。如果属性为None,则不返回任何属性。如果属性是 ALL_ATTRIBUTES 或 ALL_OPERATIONAL_ATTRIBUTES,则返回所有用户属性或所有操作属性。
举例
from ldap3 import Server, Connection, SUBTREE total_entries = 0 server = Server('test-server') c = Connection(server, user='username', password='password') c.search(search_base = 'o=test', search_filter = '(objectClass=inetOrgPerson)', search_scope = SUBTREE, attributes = ['cn', 'givenName'], paged_size = 5) total_entries += len(c.response) for entry in c.response: print(entry['dn'], entry['attributes']) cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie'] while cookie: c.search(search_base = 'o=test', search_filter = '(objectClass=inetOrgPerson)', search_scope = SUBTREE, attributes = ['cn', 'givenName'], paged_size = 5, paged_cookie = cookie) total_entries += len(c.response) cookie = c.result['controls']['1.2.840.113556.1.4.319']['value']['cookie'] for entry in c.response: print(entry['dn'], entry['attributes']) print('Total entries retrieved:', total_entries)
主要就是 search 函数传四个参数:search_base、search_filter、search_scope、attributes。
- search_base:一般就是之前三个操作都要用到的 dn
- search_filter:默认为 '(objectClass=inetOrgPerson)'
- search_scope:我用的是 SUBTREE
- attributes:我用的是 ALL_ATTRIBUTES
增删改查完整版代码
增删改查操作都是从 Excel 表格中读取数据的。
# coding: utf8 import sys import json from ldap3 import Connection, Server, ALL, MODIFY_ADD, MODIFY_REPLACE, SUBTREE, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES import utils from config import BASE_DN, LDAP_TEST_CONFIG, LDAP_PROD_CONFIG, GROUP_DNS host = "xxx.xxx.xxx.xxx" port = xxx user = "cn=xxx,dc=xx,dc=xxx" password = "xxx" # 创建连接 server = Server(host=host, port=port, get_info=ALL) conn = Connection(server=server, auto_bind=True, read_only=False, fast_decoder=True, check_names=True, user=user, password=password) # 检查连接是否成功 def test_connection(): print(server.info) print(conn.user) print(conn.extend.standard.who_am_i()) # 获取用户 def get_users(): conn.search(search_base="dc=xxx,dc=xxx", attributes=ALL_ATTRIBUTES, search_filter='(objectclass=person)') print(conn.result) res = conn.response_to_json() res = json.loads(res)['entries'] return res print("===================测试链接====================") test_connection() print("\n\n\n===================打印用户====================") print(get_users()) ldap_config = {} excel_file_path = "ldap-users-example.xlsx" ldap_user_excel_file = excel_file_path # 批量添加组织人员 def add_users(): # 读取用户excel信息 userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file) # print(userattrs) print("\n\n\n===================开始添加用户========================") for user_dn, userattr in userattrs.items(): # 添加用户 conn.add(user_dn, ['top', 'inetOrgPerson', 'posixAccount'], userattr) res = conn.result # 用户已存在 if res['result'] != 0: msg = res['description'] print("add user failed:%s res: %s" % (user_dn, res)) continue # 添加用户到用户组,直接添加到 cn = xx组中,没有这个操作的话就是不添加到 cn 中 for GROUP_DN in GROUP_DNS: conn.modify(GROUP_DN, {'uniqueMember': [(MODIFY_ADD, user_dn)]}) # print("====modify result=====") res = conn.result msg = "success" if res['result'] != 0: msg = res['message'] print("add user to group:%s res: %s" % (GROUP_DN, msg)) continue print(user_dn, "success") print("===================添加用户完毕========================") # 批量删除组织人员 def delete_users(): # 读取用户excel信息 userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file) # print(userattrs) print("\n\n\n===================开始删除用户========================") for user_dn, userattr in userattrs.items(): # 删除用户 conn.delete(user_dn) res = conn.result print(user_dn, "success") print("===================删除用户完毕========================") # 修改人员信息 def modify_users(): # 读取用户excel信息 userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file) # print(userattrs) print("\n\n\n===================开始修改用户信息========================") for user_dn, userattr in userattrs.items(): # 修改用户部门和工作地点 conn.modify(user_dn, {'physicalDeliveryOfficeName': [(MODIFY_REPLACE, ['武汉研发组'])], 'l': [(MODIFY_REPLACE, ['武汉'])]}) res = conn.result print(user_dn, "success") print("===================修改用户信息完毕========================") # 查询人员信息 def search_users(): # 读取用户excel信息 userattrs = utils.generate_ldap_userattrs(ldap_user_excel_file) # print(userattrs) print("\n\n\n===================开始查询用户信息========================") for user_dn, userattr in userattrs.items(): # 查询用户 print(user_dn) status = conn.search(search_base = user_dn, search_filter = '(objectClass=inetOrgPerson)', search_scope = SUBTREE, attributes = ALL_ATTRIBUTES) if status: print(user_dn, "success") else: print(user_dn, "failed") print("===================查询用户信息完毕========================") test_connection() get_users() search_users() add_users() delete_users() modify_users() search_users() # close the connection conn.unbind()