Python

Python for Network Scripting

Python is the de facto scripting language for network security. Most security tools either have Python bindings, are written in Python, or produce output that Python can parse. When off-the-shelf tools do not do exactly what you need, a Python script fills the gap.

This guide covers the core Python libraries and patterns for network security scripting - from basic socket programming to scapy packet crafting to automating multi-step security workflows.

Why Python for Security

Python hits a practical sweet spot. It is high-level enough to be productive (no memory management, rich standard library, massive ecosystem) but low-level enough to work with raw packets and binary protocols when needed.

The security ecosystem is heavily Python-based. Scapy, Impacket, Pwntools, Volatility, Burp extensions, Metasploit's Python modules, and hundreds of PoC exploits are all Python. Learning Python is not optional if you work in security - it is a prerequisite.

Python 3 is the only option worth considering. Python 2 reached end-of-life in 2020 and many libraries have dropped support for it. If you find a security script written in Python 2, port it or find an alternative.

Socket Programming Basics

The socket module is part of Python's standard library. It provides low-level networking primitives - TCP and UDP connections, DNS resolution, and raw packet access.

TCP client:

import socket

def tcp_connect(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    try:
        sock.connect((host, port))
        # Send data
        sock.send(b"GET / HTTP/1.1\r\nHost: " + host.encode() + b"\r\n\r\n")
        # Receive response
        response = sock.recv(4096)
        return response.decode('utf-8', errors='replace')
    finally:
        sock.close()

UDP client:

import socket

def udp_send(host, port, data):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(3)
    sock.sendto(data, (host, port))
    try:
        response, addr = sock.recvfrom(4096)
        return response
    except socket.timeout:
        return None
    finally:
        sock.close()

TCP server (for catching reverse shells or testing):

import socket
import threading

def handle_client(conn, addr):
    print(f"Connection from {addr}")
    data = conn.recv(4096)
    print(f"Received: {data}")
    conn.send(b"ACK")
    conn.close()

def start_server(port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', port))
    server.listen(5)
    print(f"Listening on port {port}")
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target=handle_client, args=(conn, addr))
        thread.start()

Building a Port Scanner

graph TD
    subgraph "Port Scanner Architecture"
        A["Target IP + Port Range"] --> B["Thread Pool\nThreadPoolExecutor"]
        B --> C["Worker 1\nPort 1-100"]
        B --> D["Worker 2\nPort 101-200"]
        B --> E["Worker 3\nPort 201-300"]
        B --> F["Worker N\nPort ..."]
        C --> G{"socket.connect()"}
        D --> G
        E --> G
        F --> G
        G -->|Success| H["Port Open\nGrab Banner"]
        G -->|Refused| I["Port Closed"]
        G -->|Timeout| J["Port Filtered"]
        H --> K["Results Collection"]
        I --> K
        J --> K
    end

Multi-threaded port scanner architecture using Python's ThreadPoolExecutor

A basic port scanner demonstrates sockets, threading, and result collection:

import socket
from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_port(host, port, timeout=1):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    try:
        result = sock.connect_ex((host, port))
        if result == 0:
            # Try to grab banner
            try:
                sock.send(b'\r\n')
                banner = sock.recv(1024).decode('utf-8', errors='replace').strip()
            except:
                banner = ''
            return port, True, banner
        return port, False, ''
    except:
        return port, False, ''
    finally:
        sock.close()

def scan_host(host, ports=range(1, 1025), threads=100):
    open_ports = []
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {executor.submit(scan_port, host, port): port for port in ports}
        for future in as_completed(futures):
            port, is_open, banner = future.result()
            if is_open:
                open_ports.append((port, banner))
                print(f"  {port}/tcp open  {banner}")
    return sorted(open_ports)

if __name__ == '__main__':
    import sys
    target = sys.argv[1] if len(sys.argv) > 1 else '127.0.0.1'
    print(f"Scanning {target}")
    results = scan_host(target)

This scanner uses ThreadPoolExecutor for concurrent scanning. 100 threads can scan 1024 ports in a few seconds. The connect_ex() method returns 0 on success instead of raising an exception, which is cleaner for scanning.

Raw Sockets

Raw sockets let you craft packets at the IP level. This requires root privileges.

import socket
import struct

# Create raw socket for receiving
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)

# Receive a packet
packet = raw_socket.recvfrom(65535)
ip_header = packet[0][:20]
iph = struct.unpack('!BBHHHBBH4s4s', ip_header)

version_ihl = iph[0]
version = version_ihl >> 4
ihl = version_ihl & 0xF
ttl = iph[5]
protocol = iph[6]
src_addr = socket.inet_ntoa(iph[8])
dst_addr = socket.inet_ntoa(iph[9])

print(f"Source: {src_addr}, Dest: {dst_addr}, TTL: {ttl}")

Raw socket programming is tedious because you manually pack and unpack binary headers. For anything beyond simple experiments, use scapy instead - it handles the binary protocol details for you.

Introduction to Scapy

Scapy is the most powerful packet manipulation library available for Python. It lets you build, send, receive, and dissect packets for virtually any protocol. We cover scapy in depth in a separate article, but here is a quick introduction.

Install it:

pip install scapy

Basic packet construction:

from scapy.all import *

# Build an ICMP ping
packet = IP(dst="192.168.1.1") / ICMP()
response = sr1(packet, timeout=2, verbose=0)
if response:
    print(f"Reply from {response.src}, TTL={response.ttl}")

# TCP SYN packet
syn = IP(dst="192.168.1.1") / TCP(dport=80, flags='S')
response = sr1(syn, timeout=2, verbose=0)
if response and response[TCP].flags == 'SA':
    print("Port 80 is open")

# ARP request
arp = ARP(pdst="192.168.1.0/24")
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
result = srp(broadcast / arp, timeout=2, verbose=0)[0]
for sent, received in result:
    print(f"{received.psrc} -> {received.hwsrc}")

Scapy's layer stacking with the / operator is intuitive. IP() / TCP() / Raw("data") builds a complete TCP packet with a payload. Each layer handles its own checksums and lengths automatically.

HTTP Scripting with Requests

The requests library is the standard for HTTP in Python. For web application testing, it replaces manual socket HTTP.

import requests

# Basic GET with error handling
try:
    response = requests.get('https://example.com', timeout=10)
    print(f"Status: {response.status_code}")
    print(f"Headers: {dict(response.headers)}")
    print(f"Body length: {len(response.text)}")
except requests.exceptions.RequestException as e:
    print(f"Error: {e}")

# POST with data
data = {'username': 'admin', 'password': 'test'}
response = requests.post('https://example.com/login', data=data)

# Session management (persists cookies)
session = requests.Session()
session.post('https://example.com/login', data=data)
# Subsequent requests use the session cookies
profile = session.get('https://example.com/profile')

# Directory brute forcing
def dir_brute(base_url, wordlist_path):
    with open(wordlist_path) as f:
        for word in f:
            url = f"{base_url}/{word.strip()}"
            try:
                r = requests.get(url, timeout=5)
                if r.status_code != 404:
                    print(f"[{r.status_code}] {url}")
            except:
                pass

For testing that involves SSL certificate issues (self-signed certs, expired certs), use verify=False:

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
response = requests.get('https://target.local', verify=False)

Parsing Network Data

Security scripting often means parsing output from other tools. Here are common patterns:

Parsing nmap XML output:

import xml.etree.ElementTree as ET

def parse_nmap(xml_file):
    tree = ET.parse(xml_file)
    hosts = []
    for host in tree.findall('.//host'):
        addr = host.find('address').get('addr')
        ports = []
        for port in host.findall('.//port'):
            if port.find('state').get('state') == 'open':
                portid = port.get('portid')
                service = port.find('service')
                name = service.get('name', 'unknown') if service is not None else 'unknown'
                ports.append({'port': portid, 'service': name})
        if ports:
            hosts.append({'ip': addr, 'ports': ports})
    return hosts

Parsing pcap files with scapy:

from scapy.all import rdpcap, TCP, IP

def analyze_pcap(pcap_file):
    packets = rdpcap(pcap_file)
    connections = {}
    for pkt in packets:
        if pkt.haslayer(TCP) and pkt.haslayer(IP):
            src = f"{pkt[IP].src}:{pkt[TCP].sport}"
            dst = f"{pkt[IP].dst}:{pkt[TCP].dport}"
            key = tuple(sorted([src, dst]))
            connections[key] = connections.get(key, 0) + 1
    return connections

Working with binary data (struct module):

import struct

# Parse a DNS header (first 12 bytes)
def parse_dns_header(data):
    fields = struct.unpack('!HHHHHH', data[:12])
    return {
        'id': fields[0],
        'flags': fields[1],
        'questions': fields[2],
        'answers': fields[3],
        'authority': fields[4],
        'additional': fields[5]
    }

Automating Security Tasks

Python excels at chaining security tools together. The subprocess module runs external commands, and you can parse their output for the next step.

import subprocess
import json

def run_nmap(target, ports='1-1024'):
    cmd = ['nmap', '-sV', '-p', ports, '-oX', '-', target]
    result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
    return result.stdout

def run_searchsploit(service_name, version):
    cmd = ['searchsploit', '--json', f'{service_name} {version}']
    result = subprocess.run(cmd, capture_output=True, text=True)
    return json.loads(result.stdout)

Automating a reconnaissance workflow:

import subprocess
import time

def recon_workflow(target):
    print(f"[*] Starting recon on {target}")
    
    # Step 1: Quick port scan
    print("[*] Running quick port scan...")
    nmap_result = subprocess.run(
        ['nmap', '-sS', '-T4', '--top-ports', '1000', '-oX', 'quick.xml', target],
        capture_output=True, text=True, timeout=120
    )
    
    # Step 2: Parse results and do detailed scan on open ports
    open_ports = parse_nmap('quick.xml')  # using function from above
    if open_ports:
        port_list = ','.join([p['port'] for p in open_ports[0]['ports']])
        print(f"[*] Deep scanning ports: {port_list}")
        subprocess.run(
            ['nmap', '-sV', '-sC', '-p', port_list, '-oX', 'deep.xml', target],
            capture_output=True, text=True, timeout=300
        )
    
    # Step 3: Check for known vulnerabilities
    for host in open_ports:
        for port_info in host['ports']:
            if port_info['service'] != 'unknown':
                print(f"[*] Checking exploits for {port_info['service']}")
                # ... searchsploit lookup

Practical Scripts

Network monitor - detect new devices:

from scapy.all import ARP, Ether, srp
import time

def get_network_devices(subnet):
    arp = ARP(pdst=subnet)
    broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
    result = srp(broadcast / arp, timeout=2, verbose=0)[0]
    devices = {}
    for sent, received in result:
        devices[received.hwsrc] = received.psrc
    return devices

def monitor_network(subnet, interval=60):
    known_devices = get_network_devices(subnet)
    print(f"Baseline: {len(known_devices)} devices")
    
    while True:
        time.sleep(interval)
        current = get_network_devices(subnet)
        new_devices = set(current.keys()) - set(known_devices.keys())
        gone_devices = set(known_devices.keys()) - set(current.keys())
        
        for mac in new_devices:
            print(f"[NEW] {mac} -> {current[mac]}")
        for mac in gone_devices:
            print(f"[GONE] {mac} -> {known_devices[mac]}")
        
        known_devices = current

Simple credential tester:

import requests
from concurrent.futures import ThreadPoolExecutor

def test_login(url, username, password):
    data = {'username': username, 'password': password}
    try:
        r = requests.post(url, data=data, timeout=5, allow_redirects=False)
        # Check for success indicators (varies by application)
        if r.status_code == 302 or 'dashboard' in r.text.lower():
            return username, password, True
    except:
        pass
    return username, password, False

Essential Libraries

Beyond the standard library, these packages are worth installing:

Library Purpose Install
scapy Packet crafting and analysis pip install scapy
requests HTTP client pip install requests
paramiko SSH client pip install paramiko
impacket Windows protocol tools (SMB, NTLM, Kerberos) pip install impacket
pwntools CTF and exploit development pip install pwntools
dnspython DNS queries and zone transfers pip install dnspython
python-nmap nmap Python wrapper pip install python-nmap
cryptography Crypto primitives pip install cryptography
beautifulsoup4 HTML parsing pip install beautifulsoup4
pyserial Serial port communication pip install pyserial

If you are working with the BLEShark Nano, pyserial is particularly useful for serial communication, and scapy handles pcap files that the device captures. Building custom automation around your security hardware is one of Python's strongest use cases.

This article is for educational and authorized security testing purposes only. Only test systems you own or have explicit written permission to test.

Get the BLEShark Nano - $36.99+
Back to blog

Leave a comment