发布信息

Python脚本备份需求说明及自我实现的思路拓展

作者:软荐小编      2023-09-17 22:06:31     248

首先,在等级防护评估要求中,应提供数据的本地备份机制,每晚本地备份软件备份工具,异地存储,包括主要网络设备配置文件。

其次,对于日常网络运维来说,为了审计和避免因设备故障而导致网络长期故障导致设备配置丢失,需要对网络设备的配置进行策略备份。

开源软件

开源软件场景:跨平台、支持多厂商系统、UI界面友好、功能强大、备份策略可定制。

自写脚本

一般现在网络常用的脚本有python脚本和GO脚本,其中Python脚本比较适用。

自写脚本场景:定制性强,适应性高,可根据自身需求不断优化迭代。

Python脚本备份需求描述:

公司内部企业网络交换机、防火墙等配套设备需要配置备份,必须满足以下要求:

思路拓展:

1. 如何先支持自己的想法?

无非是执行命令查看全局配置,然后抓取回显并保存为配置文件。 TFTP/FTP登录网元复制配置文件,或者网元作为客户端。 FTP/TFTP建立在Linux/Windows主机上。 现在很多厂商的新设备都支持手动备份配置命令,这也是一种方法。

2. 多厂商切换系统支持。 如果使用FTP,则不需要考虑太多,因为基本上每个厂家都支持FTP。

3、每个设备的密码不同,需要对密码进行加密和解密。 执行命令时不会直接显示明文密码。

4、备份成功后,需要通过阿里云邮箱发送短信通知。

5. 如需自定义备份策略,需要单独启用该模块。

开源参考: 自行实现:

1、前期完成基本需求,很快会在测试环境中使用进行测试,继续完善软件备份工具,后续应用到生产环境中。

2、中期考虑如何结合WebUI进行监控和操作。

3、后期可以考虑生成一个比较完善的网络设备备份系统平台,实现整体闭环,并开源到Github和Gitee。

程序示例 网络设备配置备份程序 程序结构说明

当前目录依赖包生成:

pip install pipreqs
pipreqs . --encoding=utf8 --force

目录结构环境说明网络设备支持FTP服务器

UniversalFTPServer(可以直接从Microsoft Store下载)

软件备份工具_备份工具软件有哪些_备份工具软件下载

通用FTP服务器

程序执行结果说明

程序会手动将网络设备配置文件备份到服务器根目录。

代码示例

软件备份工具_备份工具软件下载_备份工具软件有哪些

例子

base-info:
  ftp_password: '123456'
  ftp_server: 10.10.10.2
  ftp_user: python
  is_encrypt: '1'

pandas==1.5.3
paramiko==3.0.0
pycryptodome==3.17
PyYAML==6.0

软件备份工具_备份工具软件下载_备份工具软件有哪些

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Linux下和Windows下安装pycryptodome 把读取的xlsx文档的密码进行加密和解密处理
# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from binascii import b2a_hex, a2b_hex
import re


class CryptPassword:
    """
    AES.new 初始化AES对象
    "
""

    def __init__(self, key, init_vector):
        """
        :param key: 初始化AES加密密钥 AES.new传入参数必须是bytes类型
        :param init_vector: 初始化iv偏移量 AES.new传入参数必须是bytes类型
        "
""
        self.key = key
        self.init_vector = init_vector
        self.mode = AES.MODE_CBC
        # self.cryptor = AES.new(self.key, self.mode, self.init_vector)

    # AES加密 待加密的字符串为str类型并返回str类型数据
    def encrypt_str(self, pass_str) -> str:
        # encode('ascii') 字符串转字节串,把字符串:abc,转为字节串:b'abc'
        cryptor = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
        # pass_str str类型 pad的data_to_pad必须为bytes类型 故pass_str需要转换为字节串
        padtext = pad(pass_str.encode('ascii'), 16, style='pkcs7')
        # 调用加密方法
        cipher_text = cryptor.encrypt(padtext)
        # 字符串 --> 十六进制
        str_cipher = b2a_hex(cipher_text)
        return str_cipher.decode('ascii')

    # AES解密 待解密的字符串为str类型并返回str类型数据
    def decode_str(self, text_str) -> str:
        if not CryptPassword.is_hex(text_str) or (len(text_str) < 32):
            return "错误的16进制加密字符串"
        else:
            # 十六进制 --> 字符串
            cipher_text = a2b_hex(text_str)
            decrypter = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
            # 调用解密方法
            plain_text = decrypter.decrypt(cipher_text)
            # 不够16位,补全到16位
            try:
                unpadtext = unpad(plain_text, 16, 'pkcs7')
                # decode('ascii') 字节串转字符串,把字节串:b'abc',转为字符串:abc
                return unpadtext.decode('ascii')
            except ValueError:
                return "请输入正确的加密字符串"

    # 16进制检查
    @staticmethod
    def is_hex(hex_str) -> bool:
        # 16进制的正则表达式
        regex = "[A-Fa-f0-9]+$"
        result = re.match(regex, hex_str)
        if result:
            return True
        else:
            return False


# 测试用例
if __name__ == '__main__':
    key_word = 'QWErtyuio@098765'
    init_vector_word = 'MEIYuanTian--Xia'
    password = 'T5fx2L3aFt'
    data = CryptPassword(key_word, init_vector_word)
    print(data.encrypt_str(password))
    print(CryptPassword.is_hex(data.encrypt_str(password)))
    text = 'c0cf24e87b328810c0b5711b2f326c8f'
    print(data.decode_str(text))

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
针对yaml/yml等配置文件进行读取和写入的实现类
"
""

import yaml
import os
import pathlib


class YamlReadWrite:
    # 初始化
    def __init__(self, filepath):
        self.filepath = filepath

    def yaml_read(self, file_name):
        # filename = str(file_name)
        # print(filename)
        if os.path.isfile(file_name):
            with open(self.filepath + '\\' + file_name, 'r', encoding='utf-8') as fp:
                line_data = yaml.load(fp, Loader=yaml.FullLoader)
                return line_data
        else:
            print("配置文件不存在")

    def yaml_write(self, file_name, encrypt_str):
        # 读取yaml文件数据
        if os.path.isfile(file_name):
            old_file_data = YamlReadWrite(self.filepath).yaml_read(file_name)
            # 修改读取数据
            old_file_data['base-info']['is_encrypt'] = encrypt_str
            with open(self.filepath + '\\' + file_name, 'w', encoding='utf-8') as fp:
                yaml.dump(old_file_data, fp)
                return "密码已加密,配置文件加密参数修改为0"
        else:
            print("配置文件不存在")


if __name__ == '__main__':
    path = os.path.dirname(__file__)
    read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
    print(read_data['base-info']['ftp_password'])
    write_data = YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
    print(write_data)
    print("修改完成")

write_excels.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
利用pandas模块读写excel文本数据,并对文本数据中的密码进行判断,如果不是16进制字符串,那就进行加密并重新写入,读取16进制字符串解密进行ftp登录
依赖模块: pandas,openpyxl
"
""
import pandas as pd
from password_aes import CryptPassword


class MachiningText:

    def __init__(self, xlsx_file_name):
        self.xlsx_file_name = xlsx_file_name

    # 更新格式化后的密码并写入xlsx文件中
    def write_xlsx_file(self, dev_name, dev_ip, dev_user, dev_pw, file_path):
        data_df = pd.Dataframe()
        data_df["设备名称"] = dev_name
        data_df["设备IP"] = dev_ip
        data_df["账号"] = dev_user
        data_df["密码"] = dev_pw
        data_df["文件路径"] = file_path
        try:
            writer = pd.ExcelWriter(self.xlsx_file_name)
            data_df.to_excel(writer, sheet_name='dev_info', index=False)
            writer.close()
        except PermissionError:
            print("请先关闭文档,再次尝试")

    def update_password(self, key, init_vector):
        dev_name = []
        dev_ip = []
        dev_user = []
        dev_pw = []
        file_path = []
        dev_pd = pd.read_excel(self.xlsx_file_name, sheet_name='dev_info')
        # 赋予i=0 密码已加密,i=1 把明文密码转为加密密码
        i = 0

        for line_str in dev_pd.values:
            dev_name = dev_name + [line_str[0].strip()]
            dev_ip = dev_ip + [line_str[1].strip()]
            dev_user = dev_user + [line_str[2].strip()]
            # try:
            #     dev_pw = dev_pw + [line_str[3].strip()]
            # except AttributeError:
            #     dev_pw = dev_pw + [line_str[3]]
            file_path = file_path + [line_str[4].strip()]

            # 如果密码是单纯的数字 len(123)会出错,需要转换为str类型
            try:
                if int(line_str[3]):
                    password_len = len(str(line_str[3]))
                    passwd_str = str(line_str[3])
                    if not CryptPassword.is_hex(passwd_str) or (password_len < 32):
                        new_password = CryptPassword(key, init_vector).encrypt_str(passwd_str)
                        # print(new_password)
                        # print(dev_pw)
                        dev_pw = dev_pw + [new_password]
                        i = 1
                elif str(line_str[3].strip()):
                    password_len = len(line_str[3].strip())
                    if not CryptPassword.is_hex(line_str[3].strip()) or (password_len < 32):
                        new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].encode('ascii'))
                        dev_pw = dev_pw + [new_password]
                        i = 1
                else:
                    return "不存在的类型"
            except ValueError:
                # 密码不是16进制,则进行加密
                password_len = len(line_str[3].strip())
                if (not CryptPassword.is_hex(line_str[3].strip())) or (password_len < 32):
                    new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].strip())
                    dev_pw = dev_pw + [new_password]
                    i = 1
                else:
                    new_password = [line_str[3]]
                    # 如果密码是16进制,new_password 取表中密码,赋予Dev_pw
                    dev_pw = dev_pw + new_password
        # 密码有加密,则重新写xls表,否则不需要重写
        # print(dev_ip)
        # print(dev_user)
        # print(dev_pw)
        # print(file_path)
        if i == 1:
            MachiningText(self.xlsx_file_name).write_xlsx_file(dev_name, dev_ip, dev_user, dev_pw, file_path)


if __name__ == '__main__':
    key_word = 'QWErtyuio@098765'
    init_vector_word = 'MEIYuanTian--Xia'
    MachiningText(r'./dev_info.xlsx').update_password(key_word, init_vector_word)
    print("完成")

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 模拟客户端进行FTP服务器验证

import ftplib
import utils


class FTPServer:
    def __init__(self, ftp_server):
        self.ftp_server = ftp_server

    def connect_ftp_server(self, ftp_user, ftp_password) -> bool:
        print("正在检测FTP服务器,请稍候......")
        try:
            ftp = ftplib.FTP(self.ftp_server)
            print(ftp.getwelcome())
            ftp.login(ftp_user, ftp_password)
            ftp.encoding = 'utf-8'
            # 230 Logged in 230代表登录成功
            login_response = ftp.login(ftp_user, ftp_password)
            # 通过指定分隔符对字符串进行切片 login_response.split(" ")[0]
            if int(login_response.split(" ")[0]) == 230:
                print("FTP服务器登陆成功!")
                utils.writelog("FTP服务器登陆成功!\n")
                return True
            else:
                return False
        except ConnectionRefusedError:
            print("FTP服务器无法登录,请检查服务是否启动!")
            utils.writelog("FTP服务器无法登录,请检查服务是否启动!\n")
            return False


if __name__ == '__main__':
    user = 'python'
    password = '123456'
    server = '10.10.10.2'
    FTPServer(server).connect_ftp_server(user, password)
    print("完成")

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 代码运行日志生成器

import datetime
import msvcrt


def writelog(logstr):
    try:
        f = open('./result.log''a')
        out = f.write(str(datetime.datetime.now()) + ":-->" + logstr)
        f.close()
        return out
    except Exception as _result:
        return print(_result)

# 如果不打包exe不需要 借鉴前人经验
def prompt_msg(tmep_str):
    print("------------------------------------------------------------------")
    print("如发现程序有问题,请联系作者!")
    print("按任意键退出......")
    print("------------------------------------------------------------------")
    return ord(msvcrt.getch())

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 利用paramiko进行网络设备ssh连接和备份命令操作
from configuration_rw import YamlReadWrite
from write_excels import MachiningText
from ftp_server import FTPServer
from password_aes import CryptPassword
import pandas as pd
import paramiko
import time
import utils
import os


def connect_device_ssh(dev_name, dev_ip, dev_username, dev_pw, filename_path):
    try:
        # 创建一个SSH客户端
        ssh_client = paramiko.SSHClient()
        # AutoAddPolicy: 自动添加主机名和主机密钥
        # set_missing_host_key_policy: 一个连接主机的策略(没有本地主机秘钥或者HostKeys对象时)
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh_client.connect(hostname=dev_ip, port=22, username=dev_username, password=dev_pw)
        # 打开连接到的终端,使用ssh shell通道,相当于使用ssh远程到了主机上
        command = ssh_client.invoke_shell()
        # send发送内容,然后将字符串进行编码,编码后中文仍乱码
        command.send(("ftp " + _FTP_HOST).encode())
        command.send(b"\n")
        command.send(_FTP_USER + "\n")
        time.sleep(1)
        command.send(_FTP_PW + "\n")
        time.sleep(1)
        command.send(b"\n")
        time.sleep(1)
        command.send(b"\n")

        # 备份flash:/vrpcfg.zip,并提取配置名称
        cfg_name = filename_path.split("/")[1].split(".")
        expanded_name = "." + cfg_name[1]

        # 拼接备份文件名称
        bak_file_name = cfg_name[0] + "-" + dev_ip + "-" + time.strftime("%Y-%m-%d") + expanded_name
        # print (Bak_File_Name)
        command.send("put " + filename_path + " " + bak_file_name + "\n")
        # print("put " + filename_path + " " + Bak_File_Name + "\n")
        time.sleep(1)
        command.send(b"\n")

        received = command.recv(65535).decode()
        # 备份成功recv 包含“226 File received ok”
        if "226 File has been recieved" in received:
            print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!")
            utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!\n")
        ssh_client.close()
    except:
        print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。")
        utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。######\n")

    # recv = command.recv(65535).decode()  # recv接收响应,字节为65535,并进行解码
    # result = recv.splitlines()
    # print(recv)


if __name__ == '__main__':

    # key: 16或16的倍数
    key = 'QWErtyuio@098765'
    # init_vector: 16位
    init_vector = 'MEIYuanTian--Xia'
    # 读取程序当前目录路径
    path = os.path.dirname(__file__)
    read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')

    if os.path.exists('./result.log'):
        os.remove('./result.log')

    # 私有常量
    _IS_ENCRYPT = int(read_data['base-info']['is_encrypt'])
    _FTP_HOST = read_data['base-info']['ftp_server']
    _FTP_USER = read_data['base-info']['ftp_user']
    _FTP_PW = read_data['base-info']['ftp_password']

    #  _IS_ENCRYPT = 1, 把'./dev_info.xlsx中密码字段的明文密码改为密文(AES)
    if _IS_ENCRYPT == 1:
        xlsx_file_name = r'./Dev_info.xlsx'
        MachiningText(xlsx_file_name).update_password(key, init_vector)
        # 加密后把 _IS_ENCRYPT = 0
        YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
        utils.writelog("把当前明文密码修改成功为密文密码并更改配置文件为is_encrypt参数为0")
    else:
        utils.writelog("dev_info.xlsx文件中密码字段已加密!")
        print("dev_info.xlsx文件中密码字段已加密!")

    if FTPServer(_FTP_HOST).connect_ftp_server(_FTP_USER, _FTP_PW):
        dev_file_name = './dev_info.xlsx'
        dev_pd = pd.read_excel(dev_file_name, sheet_name='dev_info')
        dev_info = dev_pd.values
        # Dev_Info.head()
        for line_str in dev_info:
            # print(line_str[0], line_str[1], line_str[2], line_str[3], line_str[4])
            dev_name_str = line_str[0]
            dev_ip_str = line_str[1]
            dev_username_str = line_str[2]
            dev_pw_str = CryptPassword(key, init_vector).decode_str(line_str[3])
            # print (PassWord)
            filename_path_str = line_str[4]
            connect_device_ssh(dev_name_str, dev_ip_str, dev_username_str, dev_pw_str, filename_path_str)

        # prompt_msg("备份完成,备份文件保存在FTP服务器的目录下")
        utils.writelog("备份完成,备份文件保存在FTP服务器的目录下")
    else:
        # prompt_msg("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
        utils.writelog("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")

借助paramiko连接,可以手动备份华为和H3C网络设备配置文件。 未来将采用netmiko进行构建,并增加对其他厂商网络设备的支持。

-EOF-

过去推荐的

相关内容 查看全部