test_gre: Add unit tests for NVGRE

This patch adds test case of NVGRE.
Also, fixes unit tests for GRE to the method using packet capture.
For that reason, add packet capture files.

Signed-off-by: Shinpei Muraoka <shinpei.muraoka@gmail.com>
Signed-off-by: FUJITA Tomonori <fujita.tomonori@lab.ntt.co.jp>
This commit is contained in:
Shinpei Muraoka 2016-12-20 11:44:56 +09:00 committed by FUJITA Tomonori
parent 19e415a19d
commit 6ea6e86e16
4 changed files with 79 additions and 45 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -13,69 +13,103 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import logging
import os
import sys
import unittest
import logging
import struct
from nose.tools import eq_
from nose.tools import ok_
import six
from nose.tools import eq_, raises
from ryu.lib.packet.gre import gre
from ryu.lib.packet.ether_types import ETH_TYPE_IP
from ryu.lib import pcaplib
from ryu.lib.packet import gre
from ryu.lib.packet import packet
from ryu.utils import binary_str
from ryu.lib.packet.ether_types import ETH_TYPE_IP, ETH_TYPE_TEB
LOG = logging.getLogger(__name__)
GENEVE_DATA_DIR = os.path.join(
os.path.dirname(sys.modules[__name__].__file__),
'../../packet_data/pcap/')
class Test_gre(unittest.TestCase):
""" Test case for gre
"""
Test case gre for ryu.lib.packet.gre.
"""
protocol = ETH_TYPE_IP
version = 0
gre_proto = ETH_TYPE_IP
nvgre_proto = ETH_TYPE_TEB
checksum = 0x440d
key = 1000
seq_number = 10
key = 256100
vsid = 1000
flow_id = 100
buf = struct.pack("!BBHH2xII", 0xb0, 0, protocol, checksum, key, seq_number)
gre = gre(protocol, checksum, key, seq_number)
gre = gre.gre(version=version, protocol=gre_proto, checksum=checksum,
key=key, seq_number=seq_number)
def setUp(self):
pass
def test_key_setter(self):
self.gre.key = self.key
eq_(self.gre._key, self.key)
eq_(self.gre._vsid, self.vsid)
eq_(self.gre._flow_id, self.flow_id)
def tearDown(self):
pass
def test_key_setter_none(self):
self.gre.key = None
eq_(self.gre._key, None)
eq_(self.gre._vsid, None)
eq_(self.gre._flow_id, None)
def test_init(self):
eq_(self.protocol, self.gre.protocol)
eq_(self.checksum, self.gre.checksum)
eq_(self.key, self.gre.key)
eq_(self.seq_number, self.gre.seq_number)
self.gre.key = self.key
def test_vsid_setter(self):
self.gre.vsid = self.vsid
eq_(self.gre._key, self.key)
eq_(self.gre._vsid, self.vsid)
eq_(self.gre._flow_id, self.flow_id)
def test_flowid_setter(self):
self.gre.flow_id = self.flow_id
eq_(self.gre._key, self.key)
eq_(self.gre._vsid, self.vsid)
eq_(self.gre._flow_id, self.flow_id)
def test_nvgre_init(self):
nvgre = gre.nvgre(version=self.version, vsid=self.vsid,
flow_id=self.flow_id)
eq_(nvgre.version, self.version)
eq_(nvgre.protocol, self.nvgre_proto)
eq_(nvgre.checksum, None)
eq_(nvgre.seq_number, None)
eq_(nvgre._key, self.key)
eq_(nvgre._vsid, self.vsid)
eq_(nvgre._flow_id, self.flow_id)
def test_parser(self):
res, _, _ = self.gre.parser(self.buf)
files = [
'gre_full_options',
'gre_no_option',
'gre_nvgre_option',
]
eq_(res.protocol, self.protocol)
eq_(res.checksum, self.checksum)
eq_(res.key, self.key)
eq_(res.seq_number, self.seq_number)
for f in files:
# print('*** testing %s ...' % f)
for _, buf in pcaplib.Reader(
open(GENEVE_DATA_DIR + f + '.pcap', 'rb')):
# Checks if message can be parsed as expected.
pkt = packet.Packet(buf)
gre_pkt = pkt.get_protocol(gre.gre)
ok_(isinstance(gre_pkt, gre.gre),
'Failed to parse Gre message: %s' % pkt)
def test_serialize(self):
buf = self.gre.serialize()
res = struct.unpack_from("!BBHH2xII", six.binary_type(buf))
# Checks if message can be serialized as expected.
pkt.serialize()
eq_(res[0], 0xb0)
eq_(res[1], 0)
eq_(res[2], self.protocol)
eq_(res[3], self.checksum)
eq_(res[4], self.key)
eq_(res[5], self.seq_number)
@raises(Exception)
def test_malformed_gre(self):
m_short_buf = self.buf[1:gre._MIN_LEN]
gre.parser(m_short_buf)
def test_json(self):
jsondict = self.gre.to_jsondict()
g = gre.from_jsondict(jsondict['gre'])
eq_(str(self.gre), str(g))
eq_(buf, pkt.data,
"b'%s' != b'%s'" % (binary_str(buf), binary_str(pkt.data)))