close
Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions src/ipdata/iptrie.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from __future__ import annotations

import ipaddress
import socket
from collections.abc import Iterator
from typing import TypeVar, Generic
Expand Down Expand Up @@ -53,18 +52,40 @@ def _normalize_ip_key(key: str) -> tuple[str, bool]:
if not key:
raise InvalidIPError("Key cannot be empty")

addr = key.rsplit("/", 1)[0] if "/" in key else key

if "/" in key:
addr_str, sep, prefix_str = key.rpartition("/")
try:
prefix_len = int(prefix_str)
except ValueError:
raise InvalidIPError(f"Invalid IP address or network: {key!r}")
else:
addr_str = key
prefix_len = -1

# Try IPv4
try:
socket.inet_pton(socket.AF_INET, addr)
return key, False
except socket.error:
packed = socket.inet_pton(socket.AF_INET, addr_str)
if prefix_len >= 0:
if prefix_len > 32:
raise InvalidIPError(f"Invalid IP address or network: {key!r}")
mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF
masked = (int.from_bytes(packed, "big") & mask).to_bytes(4, "big")
return f"{socket.inet_ntop(socket.AF_INET, masked)}/{prefix_len}", False
return addr_str, False
except OSError:
pass

# Try IPv6
try:
socket.inet_pton(socket.AF_INET6, addr)
return key, True
except socket.error:
packed = socket.inet_pton(socket.AF_INET6, addr_str)
if prefix_len >= 0:
if prefix_len > 128:
raise InvalidIPError(f"Invalid IP address or network: {key!r}")
mask = ((1 << 128) - 1) << (128 - prefix_len)
masked = (int.from_bytes(packed, "big") & mask).to_bytes(16, "big")
return f"{socket.inet_ntop(socket.AF_INET6, masked)}/{prefix_len}", True
return socket.inet_ntop(socket.AF_INET6, packed), True
except OSError:
raise InvalidIPError(f"Invalid IP address or network: {key!r}")


Expand Down