lgsm local mirror
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

211 lines
6.3 KiB

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# LinuxGSM query_gsquery.py module
# Author: Daniel Gibbs
# Contributors: https://linuxgsm.com/contrib
# Website: https://linuxgsm.com
# Description: Allows querying of various game servers.
"""Query game servers using a set of lightweight UDP protocols."""
import argparse
import socket
import sys
engine_types = (
"protocol-valve",
"protocol-quake2",
"protocol-quake3",
"protocol-gamespy1",
"protocol-unreal2",
"ut3",
"minecraft",
"minecraftbe",
"jc2m",
"mumbleping",
"soldat",
"teeworlds",
)
class GSQuery:
"""Game server query dispatcher."""
server_response_timeout = 2
default_buffer_length = 1024
sourcequery = (
"protocol-valve",
"avalanche3.0",
"barotrauma",
"madness",
"quakelive",
"realvirtuality",
"refractor",
"source",
"goldsrc",
"spark",
"starbound",
"unity3d",
"unreal4",
"wurm",
)
idtech2query = ("protocol-quake2", "idtech2", "quake", "iw2.0")
idtech3query = ("protocol-quake3", "iw3.0", "ioquake3", "qfusion")
minecraftquery = ("minecraft", "lwjgl2")
minecraftbequery = ("minecraftbe",)
jc2mquery = ("jc2m",)
mumblequery = ("mumbleping",)
soldatquery = ("soldat",)
twquery = ("teeworlds",)
unrealquery = ("protocol-gamespy1", "unreal")
unreal2query = ("protocol-unreal2", "unreal2")
unreal3query = ("ut3", "unreal3")
def __init__(self, arguments):
"""Create a query instance from parsed CLI args."""
self.argument = arguments
#
if self.argument.engine in self.sourcequery:
self.query_prompt_string = b"\xff\xff\xff\xffTSource Engine Query\0"
elif self.argument.engine in self.idtech2query:
self.query_prompt_string = b"\xff\xff\xff\xffstatus\x00"
elif self.argument.engine in self.idtech3query:
self.query_prompt_string = b"\xff\xff\xff\xffgetstatus"
elif self.argument.engine in self.jc2mquery:
self.query_prompt_string = b"\xfe\xfd\x09\x10\x20\x30\x40"
elif self.argument.engine in self.minecraftquery:
self.query_prompt_string = b"\xfe\xfd\x09\x3d\x54\x1f\x93"
elif self.argument.engine in self.minecraftbequery:
self.query_prompt_string = (
b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00"
b"\xfe\xfe\xfe\xfe\xfd\xfd\xfd\xfd\x12\x34\x56\x78"
b"\x00\x00\x00\x00\x00\x00\x00\x00"
)
elif self.argument.engine in self.mumblequery:
self.query_prompt_string = (
b"\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08"
)
elif self.argument.engine in self.soldatquery:
self.query_prompt_string = b"\x69\x00"
elif self.argument.engine in self.twquery:
self.query_prompt_string = b"\x04\x00\x00\xff\xff\xff\xff\x05" + bytearray(
511
)
elif self.argument.engine in self.unrealquery:
self.query_prompt_string = b"\x5c\x69\x6e\x66\x6f\x5c"
elif self.argument.engine in self.unreal2query:
self.query_prompt_string = b"\x79\x00\x00\x00\x00"
elif self.argument.engine in self.unreal3query:
self.query_prompt_string = b"\xfe\xfd\x09\x00\x00\x00\x00"
self.connected = False
self.response = None
@staticmethod
def fatal_error(error_message, error_code=1):
"""Write an error message to stderr and exit."""
sys.stderr.write("ERROR: " + str(error_message) + "\n")
sys.exit(error_code)
@staticmethod
def exit_success(success_message=""):
"""Write a success message to stdout and exit."""
sys.stdout.write("OK: " + str(success_message) + "\n")
sys.exit(0)
def responding(self):
"""Send a single UDP query and print the response."""
# Connect.
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection.settimeout(self.server_response_timeout)
try:
self.connected = connection.connect(
(self.argument.address, int(self.argument.port))
)
except socket.timeout:
self.fatal_error("Request timed out", 1)
except OSError:
self.fatal_error("Unable to connect", 1)
# Send.
connection.send(self.query_prompt_string)
# Receive.
try:
self.response = connection.recv(self.default_buffer_length)
except socket.error:
self.fatal_error("Unable to receive", 2)
connection.close()
# Response.
if self.response is None:
self.fatal_error("No response", 3)
if len(self.response) < 5:
sys.exit("Short response.", 3)
else:
self.exit_success(str(self.response))
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Allows querying of various game servers.",
usage="usage: python3 %(prog)s [options]",
add_help=False,
)
parser.add_argument(
"-a",
"--address",
type=str,
required=True,
help="The IPv4 address of the server.",
)
parser.add_argument(
"-p",
"--port",
type=int,
required=True,
help="The IPv4 port of the server.",
)
parser.add_argument(
"-e",
"--engine",
metavar="ENGINE",
choices=engine_types,
help="Engine type: " + " ".join(engine_types),
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Display verbose output.",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Display debugging output.",
)
parser.add_argument(
"-V",
"--version",
action="version",
version="%(prog)s 0.0.1",
help="Display version and exit.",
)
parser.add_argument("-h", "--help", action="help", help="Display help and exit.")
return parser.parse_args()
def main():
"""CLI entrypoint."""
arguments = parse_args()
server = GSQuery(arguments)
server.responding()
if __name__ == "__main__":
main()