Summary

A discussion of reading data out of stream of bytes (encoded in a C-like structure) using the Python ctypes module.  The data in the stream is a UDP packet that represents an mDNS query or request.  The purpose of this article is to explain a process for decoding bytes streams in Python

Story

While I was working on A-Class Linux implementations I fell down the rabbit hole of mDNS.  mDNS is a part of the set of protocols that make up “Zero Configuration Networking”.  In order to understand the protocol I decided to implement (partially) an mDNS server.  You can read about that protocol and my implementation – when I get done :-).  However, all of that isn’t really important to this article, but it did bring me to dig into techniques for examining bytes in Python.

I doubt that this article is canonical, but I hope that it is at least useful.  I did find quite a few partial discussions of this topic, but I had to dig into to really understand.

Python Comment
bytes A built in object to represent an immutable sequence of single bytes.
bytearray A built in object to represent a mutable sequence of single bytes.
struct A module to encode and decode bytes from c-like structures (unfortunately the byte is an atomic unit of the struct module)
ctypes A module to interface to C functions and data.  It contains a bunch of classes which can be used to interface with C-Structures (like the struct module)

There are bunches of web hits on this topic.  However, here are a few which I found useful.

Link Comment
link A basic discussion of the ctypes module and the basic classes
link A discussion of the ctypes.sizeof function
link A discussion of the bytearray
link A Better Way to Work with Raw Data Types in Python

The UDP Header for a mDNS Packet

The IETF RFC 6895 documents the header format for mDNS (and DNS) packets.  The header contains data in Big Endian format encoded into 12 bytes that are broken up into bits, several-bits, and a few 16-bit integers.  Here is snapshot from the RFC.

 

A Red Herring

OK, I admit it.  I am a C-Programmer from way back.  My first inclination to decode the bytes looked like this:

  • Using shifts and or’s to assemble the bytes into big endian uint16s e.g. line 2
  • Using bit masks and logic “and” with or’s and shifts to pick out bit fields e.g. line 12
  • Using a tower of if/elif/elif/else to decode the individual values e.g. lines

Here is my first crack at this.

    id = message[0] << 8 | message[1]
    print(f"Id = {id}")
   
    QRFlag = (message[2] & 0x80) >> 7
    if QRFlag == 0:
        QRFlagText = "QUERY"
    else:
        QRFlagText = "RESPONSE"
        
    print(f"Query Flag = {QRFlagText}")

    opCode = (0b01111000 & message[2]) >> 3
    if opCode == 0:
        opCodeText = "Query"
    elif opCode == 1:
        opCodeText = "IQUERY"
    elif opCode == 2:
        opCodeText = "Status"
    elif opCode == 3:
        opCodeText = "Reserved"
    elif opCode == 4:
        opCodeText = "Notify"
    elif opCode == 5:
        opCodeText = "Update"
    else :
        opCodeText = "Unknown"
   
    print(f"Opcode ={opCode} {opCodeText}")

    AAFlag = (message[2] & 0b00000100) >> 2
    AAFlagText = "Authoritative" if AAFlag == 1 else "Non-Authoritative"
    print(f"AA Flag = {AAFlag} {AAFlagText}")

    TCFlag = (message[2] & 0b00000010) >> 1
    TCFlagText = "Truncation" if TCFlag == 1 else "No Truncation"
    print(f"TCFlag = {TCFlag} {TCFlagText}")

    RDFlag = message[2] & 0b00000001
    RDFlagText = "Recursion" if RDFlag == 1 else "No Recursion"
    print(f"RDFlag = {RDFlag} {RDFlagText}")

    RAFlag = (message[3] & 0b10000000) >> 7
    RAFlagText = "Recursion Available" if RDFlag == 1 else "No Recursion Available"
    print(f"RAFlag = {RAFlag} {RAFlagText}")

    ZFlag =  (message[3] & 0b01110000) >> 4
    print(f"Reserved ZFlag = {ZFlag}")

    RCCode = message[3] & 0b00001111
    if RCCode == 0:
        RCCodeText = "No Error"
    elif RCCode == 1:
        RCCodeText == "Format Error"
    elif RCCode == 2:
        RCCodeText == "Server Failure"
    elif RCCode == 3:
        RCCodeText == "Name Error"
    elif RCCode == 4:
        RCCodeText == "Not Implemented"
    elif RCCode == 5:
        RCCodeText == "Refused"
    elif RCCode == 6:
        RCCodeText == "Yx Domain"
    elif RCCode == 7:
        RCCodeText == "YX RR Set"
    elif RCCode == 8:
        RCCodeText == "NX RR Set"
    elif RCCode == 9:
        RCCodeText == "Not Authorized"
    elif RCCode == 10:
        RCCodeText == "Not Zone"

    print(f"RCCode = {ZFlag} {RCCodeText}")


    QDCount = message[4]<<8  | message[5]
    ANCount = message[6]<<8  | message[7]
    NSCount = message[8]<<8  | message[9]
    ARCount = message[10]<<8 | message[11]
    print(f"Questions = {QDCount} Answers = {ANCount} Name Servers = {NSCount} Additional Records = {ARCount}")

Encoding a c-structure with Bits

I didn’t really like the above implementation.  So I kept digging.  After a while I found the ctypes module.  This lets you

  • Derive a new class from the BigEndianStructure class (line 1)
  • Pack all of the bits and bytes next to each other (line 2)
  • Specify the field names, type and optionally the length in bits (line
class dnsHeader(ctypes.BigEndianStructure):
    _pack_ = 1
    _fields_ = [    ("id",ctypes.c_uint,16),
                    ("qr",ctypes.c_uint,1),
                    ("opcode",ctypes.c_uint,4),
                    ("aa",ctypes.c_uint,1),
                    ("tc",ctypes.c_uint,1),
                    ("rd",ctypes.c_uint,1),
                    ("ra",ctypes.c_uint,1),
                    ("z",ctypes.c_uint,3),
                    ("rcode",ctypes.c_uint,4),
                    ("qdcount",ctypes.c_uint16),
                    ("ancount",ctypes.c_uint16),
                    ("nscount",ctypes.c_uint16),
                    ("arcount",ctypes.c_uint16),

    ]

When you receive data from a socket you will get a tuple that contains

  1. a “bytes” type object containing the raw bytes of the message
  2. a “tuple” containing the IP address (not relevant to this discussion)
    (message, address) = UDPServerSocket.recvfrom(bufferSize)

Now that you have the bytes you can create an object of dnsHeader type to interpret the bytes.  The ctypes class method “from_buffer_copy” will take an array of bytes that is at least the length of the structure and return an object of the type of “dnsHeader”.

    dnsh = dnsHeader.from_buffer_copy(message)

Then you can look at the individual fields like this:

print(f"id = {self.id}")

Recommended Posts

No comment yet, add your voice below!


Add a Comment

Your email address will not be published. Required fields are marked *