首先,在等级防护评估要求中,应提供数据的本地备份机制,每晚本地备份软件备份工具,异地存储,包括主要网络设备配置文件。
其次,对于日常网络运维来说,为了审计和避免因设备故障而导致网络长期故障导致设备配置丢失,需要对网络设备的配置进行策略备份。
开源软件
开源软件场景:跨平台、支持多厂商系统、UI界面友好、功能强大、备份策略可定制。
自写脚本
一般现在网络常用的脚本有python脚本和GO脚本,其中Python脚本比较适用。
自写脚本场景:定制性强,适应性高,可根据自身需求不断优化迭代。
Python脚本备份需求描述:
公司内部企业网络交换机、防火墙等配套设备需要配置备份,必须满足以下要求:
思路拓展:
1. 如何先支持自己的想法?
无非是执行命令查看全局配置,然后抓取回显并保存为配置文件。 TFTP/FTP登录网元复制配置文件,或者网元作为客户端。 FTP/TFTP建立在Linux/Windows主机上。 现在很多厂商的新设备都支持手动备份配置命令,这也是一种方法。
2. 多厂商切换系统支持。 如果使用FTP,则不需要考虑太多,因为基本上每个厂家都支持FTP。
3、每个设备的密码不同,需要对密码进行加密和解密。 执行命令时不会直接显示明文密码。
4、备份成功后,需要通过阿里云邮箱发送短信通知。
5. 如需自定义备份策略,需要单独启用该模块。
开源参考: 自行实现:
1、前期完成基本需求,很快会在测试环境中使用进行测试,继续完善软件备份工具,后续应用到生产环境中。
2、中期考虑如何结合WebUI进行监控和操作。
3、后期可以考虑生成一个比较完善的网络设备备份系统平台,实现整体闭环,并开源到Github和Gitee。
程序示例 网络设备配置备份程序 程序结构说明
当前目录依赖包生成:
pip install pipreqs
pipreqs . --encoding=utf8 --force
目录结构环境说明网络设备支持FTP服务器
UniversalFTPServer(可以直接从Microsoft Store下载)
通用FTP服务器
程序执行结果说明
程序会手动将网络设备配置文件备份到服务器根目录。
代码示例
例子
base-info:
ftp_password: '123456'
ftp_server: 10.10.10.2
ftp_user: python
is_encrypt: '1'
pandas==1.5.3
paramiko==3.0.0
pycryptodome==3.17
PyYAML==6.0
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Linux下和Windows下安装pycryptodome 把读取的xlsx文档的密码进行加密和解密处理
# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from binascii import b2a_hex, a2b_hex
import re
class CryptPassword:
"""
AES.new 初始化AES对象
"""
def __init__(self, key, init_vector):
"""
:param key: 初始化AES加密密钥 AES.new传入参数必须是bytes类型
:param init_vector: 初始化iv偏移量 AES.new传入参数必须是bytes类型
"""
self.key = key
self.init_vector = init_vector
self.mode = AES.MODE_CBC
# self.cryptor = AES.new(self.key, self.mode, self.init_vector)
# AES加密 待加密的字符串为str类型并返回str类型数据
def encrypt_str(self, pass_str) -> str:
# encode('ascii') 字符串转字节串,把字符串:abc,转为字节串:b'abc'
cryptor = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
# pass_str str类型 pad的data_to_pad必须为bytes类型 故pass_str需要转换为字节串
padtext = pad(pass_str.encode('ascii'), 16, style='pkcs7')
# 调用加密方法
cipher_text = cryptor.encrypt(padtext)
# 字符串 --> 十六进制
str_cipher = b2a_hex(cipher_text)
return str_cipher.decode('ascii')
# AES解密 待解密的字符串为str类型并返回str类型数据
def decode_str(self, text_str) -> str:
if not CryptPassword.is_hex(text_str) or (len(text_str) < 32):
return "错误的16进制加密字符串"
else:
# 十六进制 --> 字符串
cipher_text = a2b_hex(text_str)
decrypter = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
# 调用解密方法
plain_text = decrypter.decrypt(cipher_text)
# 不够16位,补全到16位
try:
unpadtext = unpad(plain_text, 16, 'pkcs7')
# decode('ascii') 字节串转字符串,把字节串:b'abc',转为字符串:abc
return unpadtext.decode('ascii')
except ValueError:
return "请输入正确的加密字符串"
# 16进制检查
@staticmethod
def is_hex(hex_str) -> bool:
# 16进制的正则表达式
regex = "[A-Fa-f0-9]+$"
result = re.match(regex, hex_str)
if result:
return True
else:
return False
# 测试用例
if __name__ == '__main__':
key_word = 'QWErtyuio@098765'
init_vector_word = 'MEIYuanTian--Xia'
password = 'T5fx2L3aFt'
data = CryptPassword(key_word, init_vector_word)
print(data.encrypt_str(password))
print(CryptPassword.is_hex(data.encrypt_str(password)))
text = 'c0cf24e87b328810c0b5711b2f326c8f'
print(data.decode_str(text))
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
针对yaml/yml等配置文件进行读取和写入的实现类
"""
import yaml
import os
import pathlib
class YamlReadWrite:
# 初始化
def __init__(self, filepath):
self.filepath = filepath
def yaml_read(self, file_name):
# filename = str(file_name)
# print(filename)
if os.path.isfile(file_name):
with open(self.filepath + '\\' + file_name, 'r', encoding='utf-8') as fp:
line_data = yaml.load(fp, Loader=yaml.FullLoader)
return line_data
else:
print("配置文件不存在")
def yaml_write(self, file_name, encrypt_str):
# 读取yaml文件数据
if os.path.isfile(file_name):
old_file_data = YamlReadWrite(self.filepath).yaml_read(file_name)
# 修改读取数据
old_file_data['base-info']['is_encrypt'] = encrypt_str
with open(self.filepath + '\\' + file_name, 'w', encoding='utf-8') as fp:
yaml.dump(old_file_data, fp)
return "密码已加密,配置文件加密参数修改为0"
else:
print("配置文件不存在")
if __name__ == '__main__':
path = os.path.dirname(__file__)
read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
print(read_data['base-info']['ftp_password'])
write_data = YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
print(write_data)
print("修改完成")
write_excels.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
利用pandas模块读写excel文本数据,并对文本数据中的密码进行判断,如果不是16进制字符串,那就进行加密并重新写入,读取16进制字符串解密进行ftp登录
依赖模块: pandas,openpyxl
"""
import pandas as pd
from password_aes import CryptPassword
class MachiningText:
def __init__(self, xlsx_file_name):
self.xlsx_file_name = xlsx_file_name
# 更新格式化后的密码并写入xlsx文件中
def write_xlsx_file(self, dev_name, dev_ip, dev_user, dev_pw, file_path):
data_df = pd.Dataframe()
data_df["设备名称"] = dev_name
data_df["设备IP"] = dev_ip
data_df["账号"] = dev_user
data_df["密码"] = dev_pw
data_df["文件路径"] = file_path
try:
writer = pd.ExcelWriter(self.xlsx_file_name)
data_df.to_excel(writer, sheet_name='dev_info', index=False)
writer.close()
except PermissionError:
print("请先关闭文档,再次尝试")
def update_password(self, key, init_vector):
dev_name = []
dev_ip = []
dev_user = []
dev_pw = []
file_path = []
dev_pd = pd.read_excel(self.xlsx_file_name, sheet_name='dev_info')
# 赋予i=0 密码已加密,i=1 把明文密码转为加密密码
i = 0
for line_str in dev_pd.values:
dev_name = dev_name + [line_str[0].strip()]
dev_ip = dev_ip + [line_str[1].strip()]
dev_user = dev_user + [line_str[2].strip()]
# try:
# dev_pw = dev_pw + [line_str[3].strip()]
# except AttributeError:
# dev_pw = dev_pw + [line_str[3]]
file_path = file_path + [line_str[4].strip()]
# 如果密码是单纯的数字 len(123)会出错,需要转换为str类型
try:
if int(line_str[3]):
password_len = len(str(line_str[3]))
passwd_str = str(line_str[3])
if not CryptPassword.is_hex(passwd_str) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(passwd_str)
# print(new_password)
# print(dev_pw)
dev_pw = dev_pw + [new_password]
i = 1
elif str(line_str[3].strip()):
password_len = len(line_str[3].strip())
if not CryptPassword.is_hex(line_str[3].strip()) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].encode('ascii'))
dev_pw = dev_pw + [new_password]
i = 1
else:
return "不存在的类型"
except ValueError:
# 密码不是16进制,则进行加密
password_len = len(line_str[3].strip())
if (not CryptPassword.is_hex(line_str[3].strip())) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].strip())
dev_pw = dev_pw + [new_password]
i = 1
else:
new_password = [line_str[3]]
# 如果密码是16进制,new_password 取表中密码,赋予Dev_pw
dev_pw = dev_pw + new_password
# 密码有加密,则重新写xls表,否则不需要重写
# print(dev_ip)
# print(dev_user)
# print(dev_pw)
# print(file_path)
if i == 1:
MachiningText(self.xlsx_file_name).write_xlsx_file(dev_name, dev_ip, dev_user, dev_pw, file_path)
if __name__ == '__main__':
key_word = 'QWErtyuio@098765'
init_vector_word = 'MEIYuanTian--Xia'
MachiningText(r'./dev_info.xlsx').update_password(key_word, init_vector_word)
print("完成")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 模拟客户端进行FTP服务器验证
import ftplib
import utils
class FTPServer:
def __init__(self, ftp_server):
self.ftp_server = ftp_server
def connect_ftp_server(self, ftp_user, ftp_password) -> bool:
print("正在检测FTP服务器,请稍候......")
try:
ftp = ftplib.FTP(self.ftp_server)
print(ftp.getwelcome())
ftp.login(ftp_user, ftp_password)
ftp.encoding = 'utf-8'
# 230 Logged in 230代表登录成功
login_response = ftp.login(ftp_user, ftp_password)
# 通过指定分隔符对字符串进行切片 login_response.split(" ")[0]
if int(login_response.split(" ")[0]) == 230:
print("FTP服务器登陆成功!")
utils.writelog("FTP服务器登陆成功!\n")
return True
else:
return False
except ConnectionRefusedError:
print("FTP服务器无法登录,请检查服务是否启动!")
utils.writelog("FTP服务器无法登录,请检查服务是否启动!\n")
return False
if __name__ == '__main__':
user = 'python'
password = '123456'
server = '10.10.10.2'
FTPServer(server).connect_ftp_server(user, password)
print("完成")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 代码运行日志生成器
import datetime
import msvcrt
def writelog(logstr):
try:
f = open('./result.log', 'a')
out = f.write(str(datetime.datetime.now()) + ":-->" + logstr)
f.close()
return out
except Exception as _result:
return print(_result)
# 如果不打包exe不需要 借鉴前人经验
def prompt_msg(tmep_str):
print("------------------------------------------------------------------")
print("如发现程序有问题,请联系作者!")
print("按任意键退出......")
print("------------------------------------------------------------------")
return ord(msvcrt.getch())
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 利用paramiko进行网络设备ssh连接和备份命令操作
from configuration_rw import YamlReadWrite
from write_excels import MachiningText
from ftp_server import FTPServer
from password_aes import CryptPassword
import pandas as pd
import paramiko
import time
import utils
import os
def connect_device_ssh(dev_name, dev_ip, dev_username, dev_pw, filename_path):
try:
# 创建一个SSH客户端
ssh_client = paramiko.SSHClient()
# AutoAddPolicy: 自动添加主机名和主机密钥
# set_missing_host_key_policy: 一个连接主机的策略(没有本地主机秘钥或者HostKeys对象时)
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=dev_ip, port=22, username=dev_username, password=dev_pw)
# 打开连接到的终端,使用ssh shell通道,相当于使用ssh远程到了主机上
command = ssh_client.invoke_shell()
# send发送内容,然后将字符串进行编码,编码后中文仍乱码
command.send(("ftp " + _FTP_HOST).encode())
command.send(b"\n")
command.send(_FTP_USER + "\n")
time.sleep(1)
command.send(_FTP_PW + "\n")
time.sleep(1)
command.send(b"\n")
time.sleep(1)
command.send(b"\n")
# 备份flash:/vrpcfg.zip,并提取配置名称
cfg_name = filename_path.split("/")[1].split(".")
expanded_name = "." + cfg_name[1]
# 拼接备份文件名称
bak_file_name = cfg_name[0] + "-" + dev_ip + "-" + time.strftime("%Y-%m-%d") + expanded_name
# print (Bak_File_Name)
command.send("put " + filename_path + " " + bak_file_name + "\n")
# print("put " + filename_path + " " + Bak_File_Name + "\n")
time.sleep(1)
command.send(b"\n")
received = command.recv(65535).decode()
# 备份成功recv 包含“226 File received ok”
if "226 File has been recieved" in received:
print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!")
utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!\n")
ssh_client.close()
except:
print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。")
utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。######\n")
# recv = command.recv(65535).decode() # recv接收响应,字节为65535,并进行解码
# result = recv.splitlines()
# print(recv)
if __name__ == '__main__':
# key: 16或16的倍数
key = 'QWErtyuio@098765'
# init_vector: 16位
init_vector = 'MEIYuanTian--Xia'
# 读取程序当前目录路径
path = os.path.dirname(__file__)
read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
if os.path.exists('./result.log'):
os.remove('./result.log')
# 私有常量
_IS_ENCRYPT = int(read_data['base-info']['is_encrypt'])
_FTP_HOST = read_data['base-info']['ftp_server']
_FTP_USER = read_data['base-info']['ftp_user']
_FTP_PW = read_data['base-info']['ftp_password']
# _IS_ENCRYPT = 1, 把'./dev_info.xlsx中密码字段的明文密码改为密文(AES)
if _IS_ENCRYPT == 1:
xlsx_file_name = r'./Dev_info.xlsx'
MachiningText(xlsx_file_name).update_password(key, init_vector)
# 加密后把 _IS_ENCRYPT = 0
YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
utils.writelog("把当前明文密码修改成功为密文密码并更改配置文件为is_encrypt参数为0")
else:
utils.writelog("dev_info.xlsx文件中密码字段已加密!")
print("dev_info.xlsx文件中密码字段已加密!")
if FTPServer(_FTP_HOST).connect_ftp_server(_FTP_USER, _FTP_PW):
dev_file_name = './dev_info.xlsx'
dev_pd = pd.read_excel(dev_file_name, sheet_name='dev_info')
dev_info = dev_pd.values
# Dev_Info.head()
for line_str in dev_info:
# print(line_str[0], line_str[1], line_str[2], line_str[3], line_str[4])
dev_name_str = line_str[0]
dev_ip_str = line_str[1]
dev_username_str = line_str[2]
dev_pw_str = CryptPassword(key, init_vector).decode_str(line_str[3])
# print (PassWord)
filename_path_str = line_str[4]
connect_device_ssh(dev_name_str, dev_ip_str, dev_username_str, dev_pw_str, filename_path_str)
# prompt_msg("备份完成,备份文件保存在FTP服务器的目录下")
utils.writelog("备份完成,备份文件保存在FTP服务器的目录下")
else:
# prompt_msg("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
utils.writelog("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
借助paramiko连接,可以手动备份华为和H3C网络设备配置文件。 未来将采用netmiko进行构建,并增加对其他厂商网络设备的支持。
-EOF-
过去推荐的