Python for Network Scripting
Table of Contents
Python is the de facto scripting language for network security. Most security tools either have Python bindings, are written in Python, or produce output that Python can parse. When off-the-shelf tools do not do exactly what you need, a Python script fills the gap.
This guide covers the core Python libraries and patterns for network security scripting - from basic socket programming to scapy packet crafting to automating multi-step security workflows.
Why Python for Security
Python hits a practical sweet spot. It is high-level enough to be productive (no memory management, rich standard library, massive ecosystem) but low-level enough to work with raw packets and binary protocols when needed.
The security ecosystem is heavily Python-based. Scapy, Impacket, Pwntools, Volatility, Burp extensions, Metasploit's Python modules, and hundreds of PoC exploits are all Python. Learning Python is not optional if you work in security - it is a prerequisite.
Python 3 is the only option worth considering. Python 2 reached end-of-life in 2020 and many libraries have dropped support for it. If you find a security script written in Python 2, port it or find an alternative.
Socket Programming Basics
The socket module is part of Python's standard library. It provides low-level networking primitives - TCP and UDP connections, DNS resolution, and raw packet access.
TCP client:
import socket
def tcp_connect(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
try:
sock.connect((host, port))
# Send data
sock.send(b"GET / HTTP/1.1\r\nHost: " + host.encode() + b"\r\n\r\n")
# Receive response
response = sock.recv(4096)
return response.decode('utf-8', errors='replace')
finally:
sock.close()
UDP client:
import socket
def udp_send(host, port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
sock.sendto(data, (host, port))
try:
response, addr = sock.recvfrom(4096)
return response
except socket.timeout:
return None
finally:
sock.close()
TCP server (for catching reverse shells or testing):
import socket
import threading
def handle_client(conn, addr):
print(f"Connection from {addr}")
data = conn.recv(4096)
print(f"Received: {data}")
conn.send(b"ACK")
conn.close()
def start_server(port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', port))
server.listen(5)
print(f"Listening on port {port}")
while True:
conn, addr = server.accept()
thread = threading.Thread(target=handle_client, args=(conn, addr))
thread.start()
Building a Port Scanner
graph TD
subgraph "Port Scanner Architecture"
A["Target IP + Port Range"] --> B["Thread Pool\nThreadPoolExecutor"]
B --> C["Worker 1\nPort 1-100"]
B --> D["Worker 2\nPort 101-200"]
B --> E["Worker 3\nPort 201-300"]
B --> F["Worker N\nPort ..."]
C --> G{"socket.connect()"}
D --> G
E --> G
F --> G
G -->|Success| H["Port Open\nGrab Banner"]
G -->|Refused| I["Port Closed"]
G -->|Timeout| J["Port Filtered"]
H --> K["Results Collection"]
I --> K
J --> K
end
Multi-threaded port scanner architecture using Python's ThreadPoolExecutor
A basic port scanner demonstrates sockets, threading, and result collection:
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
def scan_port(host, port, timeout=1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((host, port))
if result == 0:
# Try to grab banner
try:
sock.send(b'\r\n')
banner = sock.recv(1024).decode('utf-8', errors='replace').strip()
except:
banner = ''
return port, True, banner
return port, False, ''
except:
return port, False, ''
finally:
sock.close()
def scan_host(host, ports=range(1, 1025), threads=100):
open_ports = []
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(scan_port, host, port): port for port in ports}
for future in as_completed(futures):
port, is_open, banner = future.result()
if is_open:
open_ports.append((port, banner))
print(f" {port}/tcp open {banner}")
return sorted(open_ports)
if __name__ == '__main__':
import sys
target = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
print(f"Scanning {target}")
results = scan_host(target)
This scanner uses ThreadPoolExecutor for concurrent scanning. 100 threads can scan 1024 ports in a few seconds. The connect_ex() method returns 0 on success instead of raising an exception, which is cleaner for scanning.
Raw Sockets
Raw sockets let you craft packets at the IP level. This requires root privileges.
import socket
import struct
# Create raw socket for receiving
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
# Receive a packet
packet = raw_socket.recvfrom(65535)
ip_header = packet[0][:20]
iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
version_ihl = iph[0]
version = version_ihl >> 4
ihl = version_ihl & 0xF
ttl = iph[5]
protocol = iph[6]
src_addr = socket.inet_ntoa(iph[8])
dst_addr = socket.inet_ntoa(iph[9])
print(f"Source: {src_addr}, Dest: {dst_addr}, TTL: {ttl}")
Raw socket programming is tedious because you manually pack and unpack binary headers. For anything beyond simple experiments, use scapy instead - it handles the binary protocol details for you.
Introduction to Scapy
Scapy is the most powerful packet manipulation library available for Python. It lets you build, send, receive, and dissect packets for virtually any protocol. We cover scapy in depth in a separate article, but here is a quick introduction.
Install it:
pip install scapy
Basic packet construction:
from scapy.all import *
# Build an ICMP ping
packet = IP(dst="192.168.1.1") / ICMP()
response = sr1(packet, timeout=2, verbose=0)
if response:
print(f"Reply from {response.src}, TTL={response.ttl}")
# TCP SYN packet
syn = IP(dst="192.168.1.1") / TCP(dport=80, flags='S')
response = sr1(syn, timeout=2, verbose=0)
if response and response[TCP].flags == 'SA':
print("Port 80 is open")
# ARP request
arp = ARP(pdst="192.168.1.0/24")
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
result = srp(broadcast / arp, timeout=2, verbose=0)[0]
for sent, received in result:
print(f"{received.psrc} -> {received.hwsrc}")
Scapy's layer stacking with the / operator is intuitive. IP() / TCP() / Raw("data") builds a complete TCP packet with a payload. Each layer handles its own checksums and lengths automatically.
HTTP Scripting with Requests
The requests library is the standard for HTTP in Python. For web application testing, it replaces manual socket HTTP.
import requests
# Basic GET with error handling
try:
response = requests.get('https://example.com', timeout=10)
print(f"Status: {response.status_code}")
print(f"Headers: {dict(response.headers)}")
print(f"Body length: {len(response.text)}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
# POST with data
data = {'username': 'admin', 'password': 'test'}
response = requests.post('https://example.com/login', data=data)
# Session management (persists cookies)
session = requests.Session()
session.post('https://example.com/login', data=data)
# Subsequent requests use the session cookies
profile = session.get('https://example.com/profile')
# Directory brute forcing
def dir_brute(base_url, wordlist_path):
with open(wordlist_path) as f:
for word in f:
url = f"{base_url}/{word.strip()}"
try:
r = requests.get(url, timeout=5)
if r.status_code != 404:
print(f"[{r.status_code}] {url}")
except:
pass
For testing that involves SSL certificate issues (self-signed certs, expired certs), use verify=False:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
response = requests.get('https://target.local', verify=False)
Parsing Network Data
Security scripting often means parsing output from other tools. Here are common patterns:
Parsing nmap XML output:
import xml.etree.ElementTree as ET
def parse_nmap(xml_file):
tree = ET.parse(xml_file)
hosts = []
for host in tree.findall('.//host'):
addr = host.find('address').get('addr')
ports = []
for port in host.findall('.//port'):
if port.find('state').get('state') == 'open':
portid = port.get('portid')
service = port.find('service')
name = service.get('name', 'unknown') if service is not None else 'unknown'
ports.append({'port': portid, 'service': name})
if ports:
hosts.append({'ip': addr, 'ports': ports})
return hosts
Parsing pcap files with scapy:
from scapy.all import rdpcap, TCP, IP
def analyze_pcap(pcap_file):
packets = rdpcap(pcap_file)
connections = {}
for pkt in packets:
if pkt.haslayer(TCP) and pkt.haslayer(IP):
src = f"{pkt[IP].src}:{pkt[TCP].sport}"
dst = f"{pkt[IP].dst}:{pkt[TCP].dport}"
key = tuple(sorted([src, dst]))
connections[key] = connections.get(key, 0) + 1
return connections
Working with binary data (struct module):
import struct
# Parse a DNS header (first 12 bytes)
def parse_dns_header(data):
fields = struct.unpack('!HHHHHH', data[:12])
return {
'id': fields[0],
'flags': fields[1],
'questions': fields[2],
'answers': fields[3],
'authority': fields[4],
'additional': fields[5]
}
Automating Security Tasks
Python excels at chaining security tools together. The subprocess module runs external commands, and you can parse their output for the next step.
import subprocess
import json
def run_nmap(target, ports='1-1024'):
cmd = ['nmap', '-sV', '-p', ports, '-oX', '-', target]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
return result.stdout
def run_searchsploit(service_name, version):
cmd = ['searchsploit', '--json', f'{service_name} {version}']
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)
Automating a reconnaissance workflow:
import subprocess
import time
def recon_workflow(target):
print(f"[*] Starting recon on {target}")
# Step 1: Quick port scan
print("[*] Running quick port scan...")
nmap_result = subprocess.run(
['nmap', '-sS', '-T4', '--top-ports', '1000', '-oX', 'quick.xml', target],
capture_output=True, text=True, timeout=120
)
# Step 2: Parse results and do detailed scan on open ports
open_ports = parse_nmap('quick.xml') # using function from above
if open_ports:
port_list = ','.join([p['port'] for p in open_ports[0]['ports']])
print(f"[*] Deep scanning ports: {port_list}")
subprocess.run(
['nmap', '-sV', '-sC', '-p', port_list, '-oX', 'deep.xml', target],
capture_output=True, text=True, timeout=300
)
# Step 3: Check for known vulnerabilities
for host in open_ports:
for port_info in host['ports']:
if port_info['service'] != 'unknown':
print(f"[*] Checking exploits for {port_info['service']}")
# ... searchsploit lookup
Practical Scripts
Network monitor - detect new devices:
from scapy.all import ARP, Ether, srp
import time
def get_network_devices(subnet):
arp = ARP(pdst=subnet)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
result = srp(broadcast / arp, timeout=2, verbose=0)[0]
devices = {}
for sent, received in result:
devices[received.hwsrc] = received.psrc
return devices
def monitor_network(subnet, interval=60):
known_devices = get_network_devices(subnet)
print(f"Baseline: {len(known_devices)} devices")
while True:
time.sleep(interval)
current = get_network_devices(subnet)
new_devices = set(current.keys()) - set(known_devices.keys())
gone_devices = set(known_devices.keys()) - set(current.keys())
for mac in new_devices:
print(f"[NEW] {mac} -> {current[mac]}")
for mac in gone_devices:
print(f"[GONE] {mac} -> {known_devices[mac]}")
known_devices = current
Simple credential tester:
import requests
from concurrent.futures import ThreadPoolExecutor
def test_login(url, username, password):
data = {'username': username, 'password': password}
try:
r = requests.post(url, data=data, timeout=5, allow_redirects=False)
# Check for success indicators (varies by application)
if r.status_code == 302 or 'dashboard' in r.text.lower():
return username, password, True
except:
pass
return username, password, False
Essential Libraries
Beyond the standard library, these packages are worth installing:
| Library | Purpose | Install |
|---|---|---|
| scapy | Packet crafting and analysis | pip install scapy |
| requests | HTTP client | pip install requests |
| paramiko | SSH client | pip install paramiko |
| impacket | Windows protocol tools (SMB, NTLM, Kerberos) | pip install impacket |
| pwntools | CTF and exploit development | pip install pwntools |
| dnspython | DNS queries and zone transfers | pip install dnspython |
| python-nmap | nmap Python wrapper | pip install python-nmap |
| cryptography | Crypto primitives | pip install cryptography |
| beautifulsoup4 | HTML parsing | pip install beautifulsoup4 |
| pyserial | Serial port communication | pip install pyserial |
If you are working with the BLEShark Nano, pyserial is particularly useful for serial communication, and scapy handles pcap files that the device captures. Building custom automation around your security hardware is one of Python's strongest use cases.
This article is for educational and authorized security testing purposes only. Only test systems you own or have explicit written permission to test.
Get the BLEShark Nano - $36.99+