Source code for openvpn_status.parser
from __future__ import absolute_import
from six import Iterator, next, text_type
from .models import Status, Client, Routing, GlobalStats
from .descriptors import iter_descriptors, AssignmentValueError
[docs]class LogParser(Iterator):
"""The parser for parsing OpenVPN status log.
This kind of parser is stateful. So the :meth:`LogParser.parse` could be
called once in the same instance of parser.
"""
list_separator = u','
line_separator = u'\n'
terminator = u'END'
def __init__(self, lines):
self.lines = iter(lines)
self._last_line = None
self._rollback = False
def __next__(self):
if self._rollback:
self._rollback = False
return self._last_line
while True:
line = next(self.lines).strip()
if line:
self._last_line = line
return line
[docs] @classmethod
def fromstring(cls, content):
"""Creates a parser from content of log.
:param str content: The log content.
:return: The parser instance.
"""
return cls(content.strip().split(cls.line_separator))
def rollback(self):
self._rollback = True
def expect_line(self, content):
try:
line = next(self)
except StopIteration:
raise ParsingError('expected %r but got end of input' % content)
if line != content:
raise ParsingError('expected %r but got %r' % (content, line))
def expect_list(self):
try:
line = next(self)
except StopIteration:
raise ParsingError('expected list but got end of input')
splited = line.split(self.list_separator)
if len(splited) == 1:
raise ParsingError('expected list but got %r' % line)
return splited
def expect_tuple(self, name):
try:
line = next(self)
except StopIteration:
raise ParsingError('expected 2-tuple but got end of input')
splited = line.split(self.list_separator)
if len(splited) != 2:
raise ParsingError('expected 2-tuple but got %r' % line)
if splited[0] != name:
raise ParsingError('expected 2-tuple starting with %r' % name)
return splited[1]
[docs] def parse(self):
"""Parses the status log.
:raises ParsingError: if syntax error found in the log.
:return: The :class:`.models.Status` with filled data.
"""
try:
return self._parse()
except AssignmentValueError as e:
msg = text_type(e) \
.encode('ascii', 'backslashreplace') \
.decode('ascii')
raise ParsingError('expected valid format: %s' % msg)
def _parse(self):
status = Status()
self.expect_line(Status.client_list.label)
status.updated_at = self.expect_tuple(Status.updated_at.label)
status.client_list.update({
text_type(c.real_address): c
for c in self._parse_fields(Client, Status.routing_table.label)})
status.routing_table.update({
text_type(r.virtual_address): r
for r in self._parse_fields(Routing, Status.global_stats.label)})
status.global_stats = GlobalStats()
status.global_stats.max_bcast_mcast_queue_len = self.expect_tuple(
GlobalStats.max_bcast_mcast_queue_len.label)
self.expect_line(self.terminator)
return status
def _parse_fields(self, cls, next_line):
labels = self.expect_list()
descriptors = iter_descriptors(cls)
label_to_name = {
descriptor.label: name for name, descriptor in descriptors}
index_to_name = {
index: label_to_name[label] for index, label in enumerate(labels)}
while True:
try:
values = self.expect_list()
except ParsingError as list_error:
try:
self.rollback()
self.expect_line(next_line)
except ParsingError as line_error:
raise ParsingError(*(list_error.args + line_error.args))
else:
break
instance = cls()
for index, value in enumerate(values):
name = index_to_name[index]
setattr(instance, name, value)
yield instance
[docs]class ParsingError(Exception):
pass