2023-12-22 更新 303 阅读

终于抽时间把某些软件自动更新端口的脚本写出来了,简单适用于多服务器同账号密码的,当然稍微修改也适用于不同密码的。服务器定时启动检测,代码如下懂得都懂🐶

East of Eden- Zella Day-music

Python脚本

import socket
import paramiko
import json
import base64
import re
import logging
import requests

logging.basicConfig(level=logging.INFO)

class SSHClient:
    def __init__(self, host, port=22, username='root', password='6666'):
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.client.connect(host, port, username, password)
        except Exception as e:
            logging.error(f"Failed to connect to {host}:{port}. Error: {e}")
            self.client = None

    def exec_command(self, command):
        stdin, stdout, stderr = self.client.exec_command(command)
        return stdout.read().decode('utf-8'), stderr.read().decode('utf-8')

    def file_operation(self, file_path, mode='r', content=None):
        with self.client.open_sftp() as sftp_client:
            try:
                with sftp_client.open(file_path, mode) as f:
                    if mode == 'r':
                        return f.read().decode('utf-8')
                    elif mode == 'w':
                        f.write(content)
            except Exception as e:
                logging.error(f"Failed to {mode} file {file_path}. Error: {e}")

    def close(self):
        if self.client is not None:
            self.client.close()

def update_remote_file(host, file_path, old_host, new_port):
    ssh_client = SSHClient(host)
    file_content = base64.b64decode(ssh_client.file_operation(file_path)).decode('utf-8')
    modified_content = re.sub(r'(' + re.escape(old_host) + r':)\d+', r'\g<1>' + str(new_port), file_content)
    encoded_content = base64.b64encode(modified_content.encode('utf-8')).decode('utf-8')
    ssh_client.file_operation(file_path, 'w', encoded_content)
    ssh_client.close()

def check_and_update_port(host, config_file_path, update_host, update_file_path):
    ssh_client = SSHClient(host)
    file_content = ssh_client.file_operation(config_file_path)
    config = json.loads(file_content)
    port = config['local_port']
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(2)
    result = sock.connect_ex((host, port))
    sock.close()
    if result == 0:
        logging.info(f"Port {port} is open on {host}")
    else:
        config['local_port'] += 5
        new_file_content = json.dumps(config, indent=4)
        ssh_client.file_operation(config_file_path, 'w', new_file_content)
        output, error = ssh_client.exec_command("systemctl restart trojan-go")
        logging.info(f"Output: {output}")
        logging.error(f"Error: {error}")
        logging.info(f"Port {port} is closed on {host}, new port is {config['local_port']}")
        update_remote_file(update_host, update_file_path, host, str(config['local_port']))

# Get URL content
response = requests.get('https://domain.com/list.md')
if response.status_code != 200:
    print("Unable to retrieve URL content")
    exit(1)

# Base64 decode
decoded_data = base64.b64decode(response.content).decode('utf-8')

# Extract IP and port using regex
ip_port_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b:\d{1,5}')
matches = ip_port_pattern.findall(decoded_data)

if not matches:
    print("Unable to find any IP:port pairs")
    exit(1)

# Test and update port status for each IP:port pair
def is_port_open(ip, port):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.settimeout(1)
        result = sock.connect_ex((ip, port))
        return result == 0

for match in matches:
    ip, port = match.split(':')
    port = int(port)
    if is_port_open(ip, port):
        print(f"Port {ip}:{port} is open")
    else:
        print(f"Port {ip}:{port} is closed")
        check_and_update_port(ip, '/etc/trojan-go/config.json', 'domain.com', '/www/wwwroot/domain.com/list.md')