自留地,生产队 | python代码速记 | python 技术论坛-江南app体育官方入口
import telnetlib
import time
import re
import pandas as pd
import os
import uuid
import schedule
import glob
import matplotlib.pyplot as plt
def do_telnet():
host_ip = r’129.60.161.169’
username = ‘zte’
password = ‘zte’
path = r’e:\自用\实用\omscript-master\‘
#登录网元
tn = telnetlib.telnet()
tn.open(host_ip, port=23, timeout=5)
tn.read_until(b'username:', timeout=5)
tn.write(username.encode('ascii') b'\n')
tn.read_until(b'password:', timeout=5)
tn.write(password.encode('ascii') b'\n')
time.sleep(1)
command_result = tn.read_until(b'#', timeout=5)
if b'#' not in command_result:
print('%s登录失败' % host_ip)
else:
print('%s登录成功' % host_ip)
#查看端口状态
command = "show interface brief"
command = bytes(command, encoding='utf-8')
tn.write(command b'\n')
time.sleep(1)
result_list = []
while (true):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--more--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n')
print(result_str)
#列出端口为down端口
pattern = re.compile(r'(\s )\s \s \s \s \s \s \s (down|up)\s \s \s \s ')
down_interfaces = []
for match in pattern.finditer(command_result):
interface, admin_state = match.groups()
if admin_state == 'down':
down_interfaces.append(interface)
print(down_interfaces)
#开启down端口 以xgei-0/2/0/1为例
command = "conf t"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
run_time = tn.read_until(b'#')
command = "interface xgei-0/2/0/1"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
run_time = tn.read_until(b'#')
command = "no shutdown"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
run_time = tn.read_until(b'#')
command = "end"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
run_time = tn.read_until(b'#')
command = "exit"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
# print(down_interfaces)
#查看光功率的配置
command = "show opticalinfo brief"
command = bytes(command, encoding='utf-8')
tn.write(command b'\n')
time.sleep(1)
result_list = []
while (true):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--more--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
list_str = result_str.split('\n')
print(result_str)
#ospf故障处理
command = "show clock"
command = bytes(command, encoding='utf-8')
tn.write(command b'\r\n')
run_time = tn.read_until(b'#')
run_time = re.findall(r"\d :\d :\d \s \w \s \w \s \w \s \d \s 2024", run_time.decode('gb18030'))[0]
command = "show ip ospf neighbor detail"
command = bytes(command, encoding='utf-8')
tn.write(command b'\n')
time.sleep(1)
result_list = []
while (true):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--more--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
dict_ouput = {}
dict_ouput["host_ip"] = host_ip
dict_ouput["time"] = run_time
startpattern = re.compile(r'ospf router with id (. )')
strtext = re.search(startpattern, str(result_str)).group(1)
dict_ouput["ospf router with id"] = strtext
startpattern = re.compile(r'neighbor\s (\d .\d .\d .\d )')
strtext = re.search(startpattern, str(result_str)).group(1)
dict_ouput["neighbor"] = strtext
startpattern = re.compile(r'in the area\s (. )')
strtext = re.search(startpattern, str(result_str)).group(1)
dict_ouput["area"] = strtext
startpattern = re.compile(r'state\s (\w ), ')
strtext = re.search(startpattern, str(result_str)).group(1)
dict_ouput["state"] = strtext
pd_output = pd.dataframe.from_dict([dict_ouput])
pd_output['time'] = pd.to_datetime(pd_output['time'])
pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%y-%m-%d %h:%m:%s'))
pd_output.to_csv(pathos.sepr'ospf''-'str(host_ip)'.csv', index=none, encoding='gb18030')
tn.close()
#导出up状态的端口流量
pd_result_2 = pd.dataframe()
list_temperature_vec = []
for j in list_str:
regex = re.compile(r'\w gei. \s . \s . \s . \s . \s . \s . ', re.s)
if len(re.findall(r"interface", j)) > 0:
new_columns = list_find_str = re.split(r'\s ', j)
new_columns = new_columns[0:8]
if len(regex.findall(j)) > 0:
list_find_str = regex.findall(j)[0]
list_find_str = re.split(r'\s ', list_find_str)
list_temperature_vec.append(list_find_str)
pd_result_2 = pd.dataframe(list_temperature_vec)
pd_result_2.columns = new_columns
pd_result_2 = pd_result_2[pd_result_2['phy'] == 'up']
# pd_output = pd.dataframe.pd_result_2
# print(pd_output)
# print(pd_result_2)
# 端口流量及状态检查
pd_output = pd.dataframe()
for check_port in pd_result_2['interface']:
pd_each_port = pd.dataframe()
dict_ouput = {}
dict_ouput["port_name"] = check_port
dict_ouput["time"] = run_time
command = " show interface " check_port
command = bytes(command, encoding='utf-8')
tn.write(command b'\n')
time.sleep(1)
result_list = []
while (true):
command_result = tn.read_very_eager().decode('ascii')
# print(command_result)
result_list.append(command_result)
if re.findall(r"--more--", command_result.strip()):
tn.write(b" ")
elif re.findall(r"#", command_result.strip()):
break
else:
time.sleep(0.05)
continue
result_str = "\n".join(result_list)
startpattern = re.compile(r'in_bytes\s (\d )\s ')
strtext = re.search(startpattern, str(result_str)).group(1)
strtext = int(strtext)
dict_ouput["rx_bytes"] = strtext
startpattern = re.compile(r'e_bytes\s (\d )\s ')
strtext = re.search(startpattern, str(result_str)).group(1)
strtext = int(strtext)
dict_ouput["tx_bytes"] = strtext
pd_output = pd.concat([pd_output, pd.dataframe.from_dict([dict_ouput])], axis=0)
pd_output['time'] = pd.to_datetime(pd_output['time'])
pd_output['time'] = pd_output['time'].apply(lambda x: x.strftime('%y-%m-%d %h:%m:%s'))
pd_output["id"] = pd_output["port_name"] '_' pd_output["time"]
random_suffix = uuid.uuid4().hex[:8]
# 构造文件名,并保存dataframe到csv文件
filename = f"端口流量-{host_ip}-{random_suffix}.csv"
filepath = os.path.join(path, filename)
# 保存dataframe到csv文件
pd_output.to_csv(filepath, index=false, encoding='gb18030')
#绘制折线图
file_pattern = r”e:\竞赛\电信大数据实战\omscript-master\csv数据*.csv”
csv_files = glob.glob(file_pattern)
data_frames = []
for file in csv_files:
# 读取csv文件到dataframe
df = pd.read_csv(file,encoding='gb18030')
# 将读取的数据添加到列表
data_frames.append(df)
combined_df = pd.concat(data_frames, ignore_index=true)
time_column_index = 1 # 这里假设时间列是第一列,根据实际情况调整
time_column_name = combined_df.columns[time_column_index]
combined_df[time_column_name] = pd.to_datetime(combined_df[time_column_name])
combined_df = combined_df.sort_values(by=time_column_name)
plt.rcparams[‘font.sans-serif’] = [‘simhei’]
plt.rcparams[‘axes.unicode_minus’] = false
x = combined_df.iloc[:, 1] # 获取第二列作为x轴数据
y1 = combined_df.iloc[:, 2] # 获取第三列作为y轴数据
y2 = combined_df.iloc[:, 3] # 获取第四列作为y轴数据
y1_diff= y1.diff()
y2_diff= y2.diff()
y1_diff= y1_diff.dropna()
y2_diff= y2_diff.dropna()
x_diff=x[y1_diff.index]
plt.figure() # 创建一个新的图表
plt.plot(x_diff, y1_diff, marker=’o’, linestyle=’-‘, color=’blue’,label=combined_df.columns[2])
plt.plot(x_diff, y2_diff, marker=’s’, linestyle=’–’, color=’red’,label=combined_df.columns[3])# 绘制折线图,并用圆圈标记每个数据点
plt.title(“xgei-0/2/0/2出入口流量”) # 图表标题
plt.xlabel(combined_df.columns[1]) # x轴标签,使用csv的第二列标题
plt.ylabel(“流量”) # y轴标签,使用csv的第三列标题
plt.legend() # 显示图例
plt.grid(true) # 显示网格
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()
#定时循环执行操作
def do_telnet_job():
do_telnet()
if name == ‘main‘:
schedule.every(5).seconds.do(do_telnet_job)
while true:
schedule.run_pending()
time.sleep(1)