1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
#!/usr/bin/env python
#
# License: MIT
# Copyright 2019 by Sysmocom s.f.m.c. GmbH
# Author: Alexander Couzens <lynxis@fe80.eu>
import logging
import socket
from gsmtap import GSMTAP_TYPE_OSMOCORE_LOG, gsmtap_hdr, gsmtap_log, TooSmall
LOG = logging.getLogger("gsmlogreader")
def parse_gsm(packet):
hdr = None
try:
hdr = gsmtap_hdr(packet)
except TooSmall:
return None
if hdr.type != GSMTAP_TYPE_OSMOCORE_LOG:
return None
if len(packet) <= hdr.hdr_len:
return None
try:
return gsmtap_log(packet[hdr.hdr_len:])
except TooSmall:
return None
def gsmtaplevel_to_loglevel(level):
""" convert a gsmtap log level into a python log level """
if level <= 1:
return logging.DEBUG
if level <= 3:
return logging.INFO
if level <= 5:
return logging.WARNING
return logging.ERROR
def convert_gsmtap_log(gsmtap):
level = gsmtaplevel_to_loglevel(gsmtap.level)
attr = {
"name": "gsmtap",
"levelno": level,
"levelname": gsmtap_get_logname(gsmtap.level),
"pathname": gsmtap.filename,
"lineno": gsmtap.fileline_nr,
"processName": gsmtap.proc_name,
"process": gsmtap.pid,
"module": gsmtap.subsys,
"created": float(gsmtap.sec + gsmtap.usec / 1000000.0),
"msec": int(gsmtap.usec / 1000),
"msg": gsmtap.message.replace('\n', ' '),
}
return attr
def gsmtap_get_logname(level):
names = {
1: "DEBUG",
3: "INFO",
5: "NOTICE",
7: "ERROR",
8: "FATAL",
}
if level in names:
return names[level]
return "UNKNOWN"
if __name__ == "__main__":
# Create a UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('0.0.0.0', 4729)
sock.bind(server_address)
logger = logging.getLogger("gsmtap")
logformat = "%(asctime)s %(message)s"
logging.basicConfig(format=logformat, level=logging.DEBUG)
while True:
data, address = sock.recvfrom(4096)
log = parse_gsm(data)
if not log:
continue
record = logging.makeLogRecord(convert_gsmtap_log(log))
logger.handle(record)
|