2023-12-22 更新 562 阅读
终于抽时间把某些软件自动更新端口的脚本写出来了,简单适用于多服务器同账号密码的,当然稍微修改也适用于不同密码的。服务器定时启动检测,代码如下懂得都懂🐶
Python脚本
import socket
import paramiko
import json
import base64
import re
import logging
import requests
logging.basicConfig(level=logging.INFO)
class SSHClient:
def __init__(self, host, port=22, username='root', password='6666'):
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.client.connect(host, port, username, password)
except Exception as e:
logging.error(f"Failed to connect to {host}:{port}. Error: {e}")
self.client = None
def exec_command(self, command):
stdin, stdout, stderr = self.client.exec_command(command)
return stdout.read().decode('utf-8'), stderr.read().decode('utf-8')
def file_operation(self, file_path, mode='r', content=None):
with self.client.open_sftp() as sftp_client:
try:
with sftp_client.open(file_path, mode) as f:
if mode == 'r':
return f.read().decode('utf-8')
elif mode == 'w':
f.write(content)
except Exception as e:
logging.error(f"Failed to {mode} file {file_path}. Error: {e}")
def close(self):
if self.client is not None:
self.client.close()
def update_remote_file(host, file_path, old_host, new_port):
ssh_client = SSHClient(host)
file_content = base64.b64decode(ssh_client.file_operation(file_path)).decode('utf-8')
modified_content = re.sub(r'(' + re.escape(old_host) + r':)\d+', r'\g<1>' + str(new_port), file_content)
encoded_content = base64.b64encode(modified_content.encode('utf-8')).decode('utf-8')
ssh_client.file_operation(file_path, 'w', encoded_content)
ssh_client.close()
def check_and_update_port(host, config_file_path, update_host, update_file_path):
ssh_client = SSHClient(host)
file_content = ssh_client.file_operation(config_file_path)
config = json.loads(file_content)
port = config['local_port']
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
logging.info(f"Port {port} is open on {host}")
else:
config['local_port'] += 5
new_file_content = json.dumps(config, indent=4)
ssh_client.file_operation(config_file_path, 'w', new_file_content)
output, error = ssh_client.exec_command("systemctl restart trojan-go")
logging.info(f"Output: {output}")
logging.error(f"Error: {error}")
logging.info(f"Port {port} is closed on {host}, new port is {config['local_port']}")
update_remote_file(update_host, update_file_path, host, str(config['local_port']))
# Get URL content
response = requests.get('https://domain.com/list.md')
if response.status_code != 200:
print("Unable to retrieve URL content")
exit(1)
# Base64 decode
decoded_data = base64.b64decode(response.content).decode('utf-8')
# Extract IP and port using regex
ip_port_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b:\d{1,5}')
matches = ip_port_pattern.findall(decoded_data)
if not matches:
print("Unable to find any IP:port pairs")
exit(1)
# Test and update port status for each IP:port pair
def is_port_open(ip, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((ip, port))
return result == 0
for match in matches:
ip, port = match.split(':')
port = int(port)
if is_port_open(ip, port):
print(f"Port {ip}:{port} is open")
else:
print(f"Port {ip}:{port} is closed")
check_and_update_port(ip, '/etc/trojan-go/config.json', 'domain.com', '/www/wwwroot/domain.com/list.md')
评论已关闭