summaryrefslogtreecommitdiffstats
path: root/utils/gsmtap_logread.py
blob: a29f14918477a9cd39f5c3a9c2662cdc90d1b72a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
#
# License: MIT
# Copyright 2019 by Sysmocom s.f.m.c. GmbH
# Author: Alexander Couzens <lynxis@fe80.eu>

import logging
import socket

from gsmtap import GSMTAP_TYPE_OSMOCORE_LOG, gsmtap_hdr, gsmtap_log, TooSmall

LOG = logging.getLogger("gsmlogreader")

def parse_gsm(packet):
    hdr = None

    try:
        hdr = gsmtap_hdr(packet)
    except TooSmall:
        return None

    if hdr.type != GSMTAP_TYPE_OSMOCORE_LOG:
        return None

    if len(packet) <= hdr.hdr_len:
        return None

    try:
        return gsmtap_log(packet[hdr.hdr_len:])
    except TooSmall:
        return None

def gsmtaplevel_to_loglevel(level):
    """ convert a gsmtap log level into a python log level """
    if level <= 1:
        return logging.DEBUG
    if level <= 3:
        return logging.INFO
    if level <= 5:
        return logging.WARNING

    return logging.ERROR

def convert_gsmtap_log(gsmtap):
    level = gsmtaplevel_to_loglevel(gsmtap.level)

    attr = {
        "name": "gsmtap",
        "levelno": level,
        "levelname": gsmtap_get_logname(gsmtap.level),
        "pathname": gsmtap.filename,
        "lineno": gsmtap.fileline_nr,
        "processName": gsmtap.proc_name,
        "process": gsmtap.pid,
        "module": gsmtap.subsys,
        "created": float(gsmtap.sec + gsmtap.usec / 1000000.0),
        "msec": int(gsmtap.usec / 1000),
        "msg": gsmtap.message.replace('\n', ' '),
        }
    return attr

def gsmtap_get_logname(level):
    names = {
        1: "DEBUG",
        3: "INFO",
        5: "NOTICE",
        7: "ERROR",
        8: "FATAL",
        }
    if level in names:
        return names[level]
    return "UNKNOWN"

if __name__ == "__main__":
    # Create a UDP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_address = ('0.0.0.0', 4729)
    sock.bind(server_address)

    logger = logging.getLogger("gsmtap")
    logformat = "%(asctime)s %(message)s"
    logging.basicConfig(format=logformat, level=logging.DEBUG)


    while True:
        data, address = sock.recvfrom(4096)
        log = parse_gsm(data)
        if not log:
            continue

        record = logging.makeLogRecord(convert_gsmtap_log(log))
        logger.handle(record)