diff options
| author | Matthew Poletiek <matthew.poletiek@gmail.com> | 2020-12-08 21:03:16 -0600 |
|---|---|---|
| committer | Matthew Poletiek <matthew.poletiek@gmail.com> | 2020-12-08 21:03:16 -0600 |
| commit | e99416456afd4aa8bde42016826f9a345291cbf3 (patch) | |
| tree | a7a95639cd1cb5dbe2d91a2ca8e8defafac4296d /tests/run_tests.py | |
| parent | 194cf4e5e0b6a2811103a9b739a72b9afe2b886c (diff) | |
| download | chirp-e99416456afd4aa8bde42016826f9a345291cbf3.tar.gz chirp-e99416456afd4aa8bde42016826f9a345291cbf3.tar.xz | |
Initial Commit
Diffstat (limited to 'tests/run_tests.py')
| -rwxr-xr-x | tests/run_tests.py | 1372 |
1 files changed, 1372 insertions, 0 deletions
diff --git a/tests/run_tests.py b/tests/run_tests.py new file mode 100755 index 0000000..e13fd8c --- /dev/null +++ b/tests/run_tests.py @@ -0,0 +1,1372 @@ +#!/usr/bin/env python +# +# Copyright 2011 Dan Smith <dsmith@danplanet.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from __future__ import print_function +from builtins import bytes +import copy +import traceback +import sys +import os +import shutil +import glob +import tempfile +import time +from optparse import OptionParser +from serial import Serial + +# change to the tests directory +scriptdir = os.path.dirname(sys.argv[0]) +if scriptdir: + os.chdir(scriptdir) + +sys.path.insert(0, "../") + +os.environ['CHIRP_TESTENV'] = 'sigh' +import logging +from chirp import logger + +class LoggerOpts(object): + quiet = 2 + verbose = 0 + log_file = os.path.join('logs', 'debug.log') + log_level = logging.DEBUG + +if not os.path.exists("logs"): + os.mkdir("logs") +logger.handle_options(LoggerOpts()) + +from chirp import CHIRP_VERSION +# FIXME: Not all drivers are py3 compatible in syntax, so punt on this +# until that time, and defer to the safe import loop below. +# from chirp.drivers import * +from chirp import chirp_common, directory +from chirp import import_logic, memmap, settings, errors +from chirp import settings + +directory.safe_import_drivers() + +from chirp.drivers import generic_csv + +TESTS = {} + +time.sleep = lambda s: None + + +class TestError(Exception): + def get_detail(self): + return str(self) + + +class TestInternalError(TestError): + pass + + +class TestCrashError(TestError): + def __init__(self, tb, exc, args): + Exception.__init__(self, str(exc)) + self.__tb = tb + self.__exc = exc + self.__args = args + self.__mytb = "".join(traceback.format_stack()) + + def __str__(self): + return str(self.__exc) + + def get_detail(self): + return str(self.__exc) + os.linesep + \ + ("Args were: %s" % self.__args) + os.linesep + \ + self.__tb + os.linesep + \ + "Called from:" + os.linesep + self.__mytb + + def get_original_exception(self): + return self.__exc + + +class TestFailedError(TestError): + def __init__(self, msg, detail=""): + TestError.__init__(self, msg) + self._detail = detail + + def get_detail(self): + return self._detail + + +class TestSkippedError(TestError): + pass + + +def get_tb(): + return traceback.format_exc() + + +class TestWrapper: + def __init__(self, dstclass, filename, dst=None): + self._ignored_exceptions = [] + self._dstclass = dstclass + self._filename = filename + self._make_reload = False + self._dst = dst + self.open() + + def pass_exception_type(self, et): + self._ignored_exceptions.append(et) + + def nopass_exception_type(self, et): + self._ignored_exceptions.remove(et) + + def make_reload(self): + self._make_reload = True + + def open(self): + if self._dst: + self._dst.load_mmap(self._filename) + else: + self._dst = self._dstclass(self._filename) + + def close(self): + self._dst.save_mmap(self._filename) + + def do(self, function, *args, **kwargs): + if self._make_reload: + try: + self.open() + except Exception as e: + raise TestCrashError(get_tb(), e, "[Loading]") + + try: + fn = getattr(self._dst, function) + except KeyError: + raise TestInternalError("Model lacks function `%s'" % function) + + try: + ret = fn(*args, **kwargs) + except Exception as e: + if type(e) in self._ignored_exceptions: + raise e + details = str(args) + str(kwargs) + for arg in args: + if isinstance(arg, chirp_common.Memory): + details += os.linesep + \ + os.linesep.join(["%s:%s" % (k, v) for k, v + in list(arg.__dict__.items())]) + raise TestCrashError(get_tb(), e, details) + + if self._make_reload: + try: + self.close() + except Exception as e: + raise TestCrashError(get_tb(), e, "[Saving]") + + return ret + + def get_id(self): + return "%s %s %s" % (self._dst.VENDOR, + self._dst.MODEL, + self._dst.VARIANT) + + def get_radio(self): + return self._dst + + +class TestCase: + def __init__(self, wrapper): + self._wrapper = wrapper + + def prepare(self): + pass + + def run(self): + "Return True or False for Pass/Fail" + pass + + def cleanup(self): + pass + + def compare_mem(self, a, b, ignore=None): + rf = self._wrapper.do("get_features") + + if a.tmode == "Cross": + tx_mode, rx_mode = a.cross_mode.split("->") + + for k, v in list(a.__dict__.items()): + if ignore and k in ignore: + continue + if k == "power": + continue # FIXME + elif k == "immutable": + continue + elif k == "name": + if not rf.has_name: + continue # Don't complain about name, if not supported + else: + # Name mismatch fair if filter_name() is right + v = self._wrapper.do("filter_name", v).rstrip() + elif k == "tuning_step" and not rf.has_tuning_step: + continue + elif k == "rtone" and not ( + a.tmode == "Tone" or + (a.tmode == "TSQL" and not rf.has_ctone) or + (a.tmode == "Cross" and tx_mode == "Tone") or + (a.tmode == "Cross" and rx_mode == "Tone" and + not rf.has_ctone) + ): + continue + elif k == "ctone" and (not rf.has_ctone or + not (a.tmode == "TSQL" or + (a.tmode == "Cross" and + rx_mode == "Tone"))): + continue + elif k == "dtcs" and not ( + (a.tmode == "DTCS" and not rf.has_rx_dtcs) or + (a.tmode == "Cross" and tx_mode == "DTCS") or + (a.tmode == "Cross" and rx_mode == "DTCS" and + not rf.has_rx_dtcs)): + continue + elif k == "rx_dtcs" and (not rf.has_rx_dtcs or + not (a.tmode == "Cross" and + rx_mode == "DTCS")): + continue + elif k == "offset" and not a.duplex: + continue + elif k == "cross_mode" and a.tmode != "Cross": + continue + + try: + if b.__dict__[k] != v: + msg = "Field `%s' " % k + \ + "is `%s', " % b.__dict__[k] + \ + "expected `%s' " % v + # If we set a channel that came back with a duplex + # of 'off', we may have been outside the transmit range of + # the radio, so we should not fail. + if k == "duplex" and b.__dict__[k] == "off": + continue + details = msg + details += os.linesep + "### Wanted:" + os.linesep + details += os.linesep.join(["%s:%s" % (k, v) for k, v + in list(a.__dict__.items())]) + details += os.linesep + "### Got:" + os.linesep + details += os.linesep.join(["%s:%s" % (k, v) for k, v + in list(b.__dict__.items())]) + raise TestFailedError(msg, details) + except KeyError as e: + print(sorted(a.__dict__.keys())) + print(sorted(b.__dict__.keys())) + raise + + +class TestCaseCopyAll(TestCase): + "Copy Memories From CSV" + + def __str__(self): + return "CopyAll" + + def prepare(self): + testbase = os.path.dirname(os.path.abspath(__file__)) + source = os.path.join(testbase, 'images', 'Generic_CSV.csv') + self._src = generic_csv.CSVRadio(source) + + def run(self): + src_rf = self._src.get_features() + bounds = src_rf.memory_bounds + + dst_rf = self._wrapper.do("get_features") + dst_number = dst_rf.memory_bounds[0] + + failures = [] + + for number in range(bounds[0], bounds[1]): + src_mem = self._src.get_memory(number) + if src_mem.empty: + continue + + try: + dst_mem = import_logic.import_mem(self._wrapper.get_radio(), + src_rf, src_mem, + overrides={ + "number": dst_number}) + import_logic.import_bank(self._wrapper.get_radio(), + self._src, + dst_mem, + src_mem) + except import_logic.DestNotCompatible: + continue + except import_logic.ImportError as e: + failures.append(TestFailedError("<%i>: Import Failed: %s" % + (dst_number, e))) + continue + except Exception as e: + raise TestCrashError(get_tb(), e, "[Import]") + + self._wrapper.do("set_memory", dst_mem) + ret_mem = self._wrapper.do("get_memory", dst_number) + + try: + self.compare_mem(dst_mem, ret_mem) + except TestFailedError as e: + failures.append( + TestFailedError("<%i>: %s" % (number, e), e.get_detail())) + + return failures +TESTS["CopyAll"] = TestCaseCopyAll + + +class TestCaseBruteForce(TestCase): + def __str__(self): + return "BruteForce" + + def set_and_compare(self, m): + msgs = self._wrapper.do("validate_memory", m) + if msgs: + # If the radio correctly refuses memories it can't + # store, don't fail + return + + self._wrapper.do("set_memory", m) + ret_m = self._wrapper.do("get_memory", m.number) + + # Damned Baofeng radios don't seem to properly store + # shift and direction, so be gracious here + if m.duplex == "split" and ret_m.duplex in ["-", "+"]: + ret_m.offset = ret_m.freq + \ + (ret_m.offset * int(ret_m.duplex + "1")) + ret_m.duplex = "split" + + self.compare_mem(m, ret_m) + + def do_tone(self, m, rf): + self._wrapper.pass_exception_type(errors.UnsupportedToneError) + for tone in chirp_common.TONES: + for tmode in rf.valid_tmodes: + if tmode not in chirp_common.TONE_MODES: + continue + elif tmode in ["DTCS", "DTCS-R", "Cross"]: + continue # We'll test DCS and Cross tones separately + + m.tmode = tmode + if tmode == "": + pass + elif tmode == "Tone": + m.rtone = tone + elif tmode in ["TSQL", "TSQL-R"]: + if rf.has_ctone: + m.ctone = tone + else: + m.rtone = tone + else: + raise TestInternalError("Unknown tone mode `%s'" % tmode) + + try: + self.set_and_compare(m) + except errors.UnsupportedToneError as e: + # If a radio doesn't support a particular tone value, + # don't punish it + pass + self._wrapper.nopass_exception_type(errors.UnsupportedToneError) + + def do_dtcs(self, m, rf): + if not rf.has_dtcs: + return + + m.tmode = "DTCS" + for code in rf.valid_dtcs_codes: + m.dtcs = code + self.set_and_compare(m) + + if not rf.has_dtcs_polarity: + return + + for pol in rf.valid_dtcs_pols: + m.dtcs_polarity = pol + self.set_and_compare(m) + + def do_cross(self, m, rf): + if not rf.has_cross: + return + + m.tmode = "Cross" + # No fair asking a radio to detect two identical tones as Cross instead + # of TSQL + m.rtone = 100.0 + m.ctone = 107.2 + m.dtcs = 506 + m.rx_dtcs = 516 + for cross_mode in rf.valid_cross_modes: + m.cross_mode = cross_mode + self.set_and_compare(m) + + def do_duplex(self, m, rf): + for duplex in rf.valid_duplexes: + if duplex not in ["", "-", "+", "split"]: + continue + if duplex == "split" and not rf.can_odd_split: + raise TestFailedError("Forgot to set rf.can_odd_split!") + if duplex == "split": + m.offset = rf.valid_bands[0][1] - 100000 + m.duplex = duplex + self.set_and_compare(m) + + if rf.can_odd_split and "split" not in rf.valid_duplexes: + raise TestFailedError("Paste error: rf.can_odd_split defined, but " + "split duplex not supported.") + + def do_skip(self, m, rf): + for skip in rf.valid_skips: + m.skip = skip + self.set_and_compare(m) + + def do_mode(self, m, rf): + def ensure_urcall(call): + l = self._wrapper.do("get_urcall_list") + l[0] = call + self._wrapper.do("set_urcall_list", l) + + def ensure_rptcall(call): + l = self._wrapper.do("get_repeater_call_list") + l[0] = call + self._wrapper.do("set_repeater_call_list", l) + + def freq_is_ok(freq): + for lo, hi in rf.valid_bands: + if freq > lo and freq < hi: + return True + return False + + successes = 0 + for mode in rf.valid_modes: + tmp = copy.deepcopy(m) + if mode not in chirp_common.MODES: + continue + if mode == "DV": + tmp = chirp_common.DVMemory() + try: + ensure_urcall(tmp.dv_urcall) + ensure_rptcall(tmp.dv_rpt1call) + ensure_rptcall(tmp.dv_rpt2call) + except IndexError: + if rf.requires_call_lists: + raise + else: + # This radio may not do call lists at all, + # so let it slide + pass + if mode == "FM" and freq_is_ok(tmp.freq + 100000000): + # Some radios don't support FM below approximately 30MHz, + # so jump up by 100MHz, if they support that + tmp.freq += 100000000 + + tmp.mode = mode + + if rf.validate_memory(tmp): + # A result (of error messages) from validate means the radio + # thinks this is invalid, so don't fail the test + print('Failed to validate %s: %s' % (tmp, rf.validate_memory(tmp))) + continue + + self.set_and_compare(tmp) + successes += 1 + + if (not successes) and rf.valid_modes: + raise TestFailedError("All modes were skipped, " + "something went wrong") + + def run(self): + rf = self._wrapper.do("get_features") + + def clean_mem(): + m = chirp_common.Memory() + m.number = rf.memory_bounds[0] + try: + m.mode = rf.valid_modes[0] + except IndexError: + pass + if rf.valid_bands: + m.freq = rf.valid_bands[0][0] + 600000 + else: + m.freq = 146520000 + if m.freq < 30000000 and "AM" in rf.valid_modes: + m.mode = "AM" + return m + + tests = [ + self.do_tone, + self.do_dtcs, + self.do_cross, + self.do_duplex, + self.do_skip, + self.do_mode, + ] + for test in tests: + test(clean_mem(), rf) + + if 12.5 in rf.valid_tuning_steps and \ + "split" in rf.valid_duplexes: + m = clean_mem() + if rf.valid_bands: + m.offset = rf.valid_bands[0][1] - 12500 + else: + m.offset = 151137500 + m.duplex = "split" + self.set_and_compare(m) + + return [] +TESTS["BruteForce"] = TestCaseBruteForce + + +class TestCaseEdges(TestCase): + def __str__(self): + return "Edges" + + def _mem(self, rf): + m = chirp_common.Memory() + m.freq = rf.valid_bands[0][0] + 1000000 + if m.freq < 30000000 and "AM" in rf.valid_modes: + m.mode = "AM" + else: + try: + m.mode = rf.valid_modes[0] + except IndexError: + pass + + for i in range(*rf.memory_bounds): + m.number = i + if not self._wrapper.do("validate_memory", m): + return m + + raise TestSkippedError("No mutable memory locations found") + + def do_longname(self, rf): + m = self._mem(rf) + m.name = ("X" * 256) # Should be longer than any radio can handle + m.name = self._wrapper.do("filter_name", m.name) + + self._wrapper.do("set_memory", m) + n = self._wrapper.do("get_memory", m.number) + + self.compare_mem(m, n) + + def do_badname(self, rf): + m = self._mem(rf) + + ascii = "".join([chr(x) for x in range(ord(" "), ord("~")+1)]) + for i in range(0, len(ascii), 4): + m.name = self._wrapper.do("filter_name", ascii[i:i+4]) + self._wrapper.do("set_memory", m) + n = self._wrapper.do("get_memory", m.number) + self.compare_mem(m, n) + + def do_bandedges(self, rf): + m = self._mem(rf) + min_step = min(rf.has_tuning_step and rf.valid_tuning_steps or [10]) + + for low, high in rf.valid_bands: + for freq in (low, high - int(min_step * 1000)): + m.freq = freq + if self._wrapper.do("validate_memory", m): + # Radio doesn't like it, so skip + continue + + self._wrapper.do("set_memory", m) + n = self._wrapper.do("get_memory", m.number) + self.compare_mem(m, n) + + def do_oddsteps(self, rf): + odd_steps = { + 145000000: [145856250, 145862500], + 445000000: [445856250, 445862500], + 862000000: [862731250, 862737500], + } + + m = self._mem(rf) + + for low, high in rf.valid_bands: + for band, totest in list(odd_steps.items()): + if band < low or band > high: + continue + for testfreq in totest: + step = chirp_common.required_step(testfreq) + if step not in rf.valid_tuning_steps: + continue + + m.freq = testfreq + m.tuning_step = step + self._wrapper.do("set_memory", m) + n = self._wrapper.do("get_memory", m.number) + self.compare_mem(m, n, ignore=['tuning_step']) + + def do_empty_to_not(self, rf): + firstband = rf.valid_bands[0] + testfreq = firstband[0] + for loc in range(*rf.memory_bounds): + m = self._wrapper.do('get_memory', loc) + if m.empty: + m.empty = False + m.freq = testfreq + self._wrapper.do('set_memory', m) + m = self._wrapper.do('get_memory', loc) + if m.freq == testfreq: + return + else: + raise TestFailedError('Radio failed to set an empty ' + 'location (%i)' % loc) + + def do_delete_memory(self, rf): + firstband = rf.valid_bands[0] + testfreq = firstband[0] + for loc in range(*rf.memory_bounds): + if loc == rf.memory_bounds[0]: + # Some radios will not allow you to delete the first memory + # /me glares at yaesu + continue + m = self._wrapper.do('get_memory', loc) + if not m.empty: + m.empty = True + self._wrapper.do('set_memory', m) + m = self._wrapper.do('get_memory', loc) + if not m.empty: + raise TestFailedError('Radio refused to delete a memory ' + 'location (%i)' % loc) + else: + return + + def run(self): + rf = self._wrapper.do("get_features") + + if not rf.valid_bands: + raise TestFailedError("Radio does not provide valid bands!") + + self.do_longname(rf) + self.do_bandedges(rf) + self.do_oddsteps(rf) + self.do_badname(rf) + if rf.can_delete: + self.do_empty_to_not(rf) + self.do_delete_memory(rf) + + return [] + +TESTS["Edges"] = TestCaseEdges + + +class TestCaseSettings(TestCase): + def __str__(self): + return "Settings" + + def do_get_settings(self, rf): + lst = self._wrapper.do("get_settings") + if not isinstance(lst, list): + raise TestFailedError("Invalid Radio Settings") + + def do_same_settings(self, rf): + o = self._wrapper.do("get_settings") + self._wrapper.do("set_settings", o) + n = self._wrapper.do("get_settings") + list(map(self.compare_settings, o, n)) + + @staticmethod + def compare_settings(a, b): + try: + if isinstance(a, settings.RadioSettingValue): + raise TypeError('Hit bottom') + list(map(TestCaseSettings.compare_settings, a, b)) + except TypeError: + if a.get_value() != b.get_value(): + msg = "Field is `%s', " % b + \ + "expected `%s' " % a + details = msg + raise TestFailedError(msg, details) + + def run(self): + rf = self._wrapper.do("get_features") + + if not rf.has_settings: + raise TestSkippedError("Settings not supported") + + self.do_get_settings(rf) + self.do_same_settings(rf) + + return [] + +TESTS["Settings"] = TestCaseSettings + + +class TestCaseBanks(TestCase): + def __str__(self): + return "Banks" + + def _do_bank_names(self, rf, testname): + bm = self._wrapper.do("get_bank_model") + banks = bm.get_mappings() + + for bank in banks: + name = bank.get_name() + try: + bank.set_name(testname) + except AttributeError: + return [], [] + except Exception as e: + if str(e) == "Not implemented": + return [], [] + else: + raise e + + return banks, bm.get_mappings() + + def do_bank_names(self, rf): + banks, newbanks = self._do_bank_names(rf, "T") + + for i in range(0, len(banks)): + if banks[i].get_name() != newbanks[i].get_name(): + raise TestFailedError("Bank names not preserved", + "Tried %s on %i\nGot %s" % (banks[i], + i, + newbanks[i])) + + def do_bank_names_toolong(self, rf): + testname = "Not possibly this long" + banks, newbanks = self._do_bank_names(rf, testname) + + for i in range(0, len(newbanks)): + # Truncation is allowed, but not failure + if not testname.lower().startswith(str(newbanks[i]).lower()): + raise TestFailedError("Bank names not properly truncated", + "Tried: %s on %i\nGot: %s" % + (testname, i, newbanks[i])) + + def do_bank_names_no_trailing_whitespace(self, rf): + banks, newbanks = self._do_bank_names(rf, "foo ") + + for bank in newbanks: + if str(bank) != str(bank).rstrip(): + raise TestFailedError("Bank names stored with " + + "trailing whitespace") + + def do_bank_store(self, rf): + loc = rf.memory_bounds[0] + mem = chirp_common.Memory() + mem.number = loc + mem.freq = rf.valid_bands[0][0] + 100000 + + # Make sure the memory is empty and we create it from scratch + mem.empty = True + self._wrapper.do("set_memory", mem) + + mem.empty = False + self._wrapper.do("set_memory", mem) + + model = self._wrapper.do("get_bank_model") + + # If in your bank model every channel has to be tied to a bank, just + # add a variable named channelAlwaysHasBank to it and make it True + try: + channelAlwaysHasBank = model.channelAlwaysHasBank + except: + channelAlwaysHasBank = False + + mem_banks = model.get_memory_mappings(mem) + if channelAlwaysHasBank: + if len(mem_banks) == 0: + raise TestFailedError("Freshly-created memory has no banks " + + "and it should", "Bank: %s" % + str(mem_banks)) + else: + if len(mem_banks) != 0: + raise TestFailedError("Freshly-created memory has banks " + + "and should not", "Bank: %s" % + str(mem_banks)) + + banks = model.get_mappings() + + def verify(bank): + if bank not in model.get_memory_mappings(mem): + return "Memory does not claim bank" + + if loc not in [x.number for x in model.get_mapping_memories(bank)]: + return "Bank does not claim memory" + + return None + + model.add_memory_to_mapping(mem, banks[0]) + reason = verify(banks[0]) + if reason is not None: + raise TestFailedError("Setting memory bank does not persist", + "%s\nMemory banks:%s\nBank memories:%s" % + (reason, + model.get_memory_mappings(mem), + model.get_mapping_memories(banks[0]))) + + model.remove_memory_from_mapping(mem, banks[0]) + reason = verify(banks[0]) + if reason is None and not channelAlwaysHasBank: + raise TestFailedError("Memory remains in bank after remove", + reason) + + try: + model.remove_memory_from_mapping(mem, banks[0]) + did_error = False + except Exception: + did_error = True + + if not did_error and not channelAlwaysHasBank: + raise TestFailedError("Removing memory from non-member bank " + + "did not raise Exception") + + def do_bank_index(self, rf): + if not rf.has_bank_index: + return + + loc = rf.memory_bounds[0] + mem = chirp_common.Memory() + mem.number = loc + mem.freq = rf.valid_bands[0][0] + 100000 + + self._wrapper.do("set_memory", mem) + + model = self._wrapper.do("get_bank_model") + banks = model.get_mappings() + index_bounds = model.get_index_bounds() + + model.add_memory_to_mapping(mem, banks[0]) + for i in range(0, *index_bounds): + model.set_memory_index(mem, banks[0], i) + if model.get_memory_index(mem, banks[0]) != i: + raise TestFailedError("Bank index not persisted") + + suggested_index = model.get_next_mapping_index(banks[0]) + if suggested_index not in list(range(*index_bounds)): + raise TestFailedError("Suggested bank index not in valid range", + "Got %i, range is %s" % (suggested_index, + index_bounds)) + + def run(self): + rf = self._wrapper.do("get_features") + + if not rf.has_bank: + raise TestSkippedError("Banks not supported") + + self.do_bank_names(rf) + self.do_bank_names_toolong(rf) + self.do_bank_names_no_trailing_whitespace(rf) + self.do_bank_store(rf) + # Again to make sure we clear bank info on delete + self.do_bank_store(rf) + self.do_bank_index(rf) + + return [] + +TESTS["Banks"] = TestCaseBanks + + +class TestCaseDetect(TestCase): + def __str__(self): + return "Detect" + + def run(self): + if isinstance(self._wrapper._dst, chirp_common.LiveRadio): + raise TestSkippedError("This is a live radio") + + filename = self._wrapper._filename + + try: + radio = directory.get_radio_by_image(filename) + except Exception as e: + raise TestFailedError("Failed to detect", str(e)) + + if radio.__class__.__name__ == 'DynamicRadioAlias': + # This was detected via metadata and wrapped, which means + # we found the appropriate class. + pass + elif issubclass(self._wrapper._dstclass, radio.__class__): + pass + elif issubclass(radio.__class__, self._wrapper._dstclass): + pass + elif radio.__class__ != self._wrapper._dstclass: + raise TestFailedError("%s detected as %s" % + (self._wrapper._dstclass, radio.__class__)) + return [] + +TESTS["Detect"] = TestCaseDetect + + +class TestCaseClone(TestCase): + class SerialNone: + def __init__(self, isbytes): + self.isbytes = isbytes + self.mismatch = False + self.mismatch_at = None + + def read(self, size): + if self.isbytes: + return b"" + else: + return "" + + def write(self, data): + expected = self.isbytes and bytes or str + if six.PY2: + # One driver uses bytearray() which will trigger this + # check even though it works fine on py2. So, only + # do this check for PY3 which is where it matters + # anyway. + pass + elif not self.mismatch and not isinstance(data, expected): + self.mismatch = True + self.mismatch_at = ''.join(traceback.format_stack()) + pass + + def setBaudrate(self, rate): + pass + + def setTimeout(self, timeout): + pass + + def setParity(self, parity): + pass + + def __str__(self): + return self.__class__.__name__.replace("Serial", "") + + class SerialError(SerialNone): + def read(self, size): + raise Exception("Foo") + + def write(self, data): + raise Exception("Bar") + + class SerialGarbage(SerialNone): + def read(self, size): + if self.isbytes: + buf = [] + for i in range(0, size): + buf += i % 256 + return bytes(buf) + else: + buf = "" + for i in range(0, size): + buf += chr(i % 256) + return buf + + class SerialShortGarbage(SerialNone): + def read(self, size): + if self.isbytes: + return b'\x00' * (size - 1) + else: + return "\x00" * (size - 1) + + def __str__(self): + return "Clone" + + def _run(self, serial): + error = None + live = isinstance(self._wrapper._dst, chirp_common.LiveRadio) + clone = isinstance(self._wrapper._dst, chirp_common.CloneModeRadio) + + if not clone and not live: + raise TestSkippedError('Does not support clone') + + try: + radio = self._wrapper._dst.__class__(serial) + radio.status_fn = lambda s: True + except Exception as e: + error = e + + if not live: + if error is not None: + raise TestFailedError("Clone radio tried to read from " + + "serial on init") + else: + if not isinstance(error, errors.RadioError): + raise TestFailedError("Live radio didn't notice serial " + + "was dead on init") + return [] # Nothing more to test on an error'd live radio + + error = None + try: + radio.sync_in() + except Exception as e: + error = e + + if error is None: + raise TestFailedError("Radio did not raise exception " + + "with %s data" % serial, + "On sync_in()") + elif not isinstance(error, errors.RadioError): + raise TestFailedError("Radio did not raise RadioError " + + "with %s data" % serial, + "sync_in() Got: %s (%s)\n%s" % + (error.__class__.__name__, + error, get_tb())) + + if radio.NEEDS_COMPAT_SERIAL: + radio._mmap = memmap.MemoryMap("\x00" * (1024 * 128)) + else: + radio._mmap = memmap.MemoryMapBytes(bytes(b"\x00") * (1024 * 128)) + + error = None + try: + radio.sync_out() + except Exception as e: + error = e + + if error is None: + raise TestFailedError("Radio did not raise exception " + + "with %s data" % serial, + "On sync_out()") + elif not isinstance(error, errors.RadioError): + raise TestFailedError("Radio did not raise RadioError " + + "with %s data" % serial, + "sync_out(): Got: %s (%s)" % + (error.__class__.__name__, error)) + + if serial.mismatch: + raise TestFailedError("Radio tried to write the wrong " + "type of data to the %s pipe." % ( + serial.__class__.__name__), + "TestClone:%s\n%s" % ( + serial.__class__.__name__, + serial.mismatch_at)) + + return [] + + def run(self): + isbytes = not self._wrapper._dst.NEEDS_COMPAT_SERIAL + self._run(self.SerialError(isbytes)) + self._run(self.SerialNone(isbytes)) + self._run(self.SerialGarbage(isbytes)) + self._run(self.SerialShortGarbage(isbytes)) + return [] + +TESTS["Clone"] = TestCaseClone + + +class TestOutput: + def __init__(self, output=None): + if not output: + output = sys.stdout + self._out = output + + def prepare(self): + pass + + def cleanup(self): + pass + + def _print(self, string): + print(string, file=self._out) + + def report(self, rclass, tc, msg, e): + name = ("%s %s" % (rclass.MODEL, rclass.VARIANT))[:13] + self._print("%9s %-13s %-10s %s %s" % (rclass.VENDOR.split(" ")[0], + name, + tc, + msg, e)) + + +class TestOutputANSI(TestOutput): + def __init__(self, output=None): + TestOutput.__init__(self, output) + self.__counts = { + "PASSED": 0, + "FAILED": 0, + "CRASHED": 0, + "SKIPPED": 0, + } + self.__total = 0 + + def report(self, rclass, tc, msg, e): + self.__total += 1 + self.__counts[msg] += 1 + msg += ":" + if os.isatty(1): + if msg == "PASSED:": + msg = "\033[1;32m%8s\033[0m" % msg + elif msg == "FAILED:": + msg = "\033[1;41m%8s\033[0m" % msg + elif msg == "CRASHED:": + msg = "\033[1;45m%8s\033[0m" % msg + elif msg == "SKIPPED:": + msg = "\033[1;32m%8s\033[0m" % msg + else: + msg = "%8s" % msg + + TestOutput.report(self, rclass, tc, msg, e) + + def cleanup(self): + self._print("-" * 70) + self._print("Results:") + self._print(" %-7s: %i" % ("TOTAL", self.__total)) + for t, c in list(self.__counts.items()): + self._print(" %-7s: %i" % (t, c)) + + +class TestOutputHTML(TestOutput): + def __init__(self, filename): + self._filename = filename + + def prepare(self): + print("Writing to %s" % self._filename, end=' ') + sys.stdout.flush() + self._out = file(self._filename, "w") + s = """ +<html> +<head> +<title>Test report for CHIRP version %s</title> +<style> +table.testlist { + border: thin solid black; + border-collapse: collapse; +} +td { + border: thin solid black; + padding: 2px; +} +th { + background-color: silver; + border: thin solid black; + padding: 2px; +} +td.PASSED { + background-color: green; +} +td.FAILED { + background-color: red; +} +td.CRASHED { + background-color: purple; +} +td.SKIPPED { + background-color: green; +} +</style> +</head> +<body> +<h1>Test report for CHIRP version %s</h1> +<h3>Generated on %s (%s)</h3> +<table class="testlist"> +<tr> + <th>Vendor</th><th>Model</th><th>Test Case</th> + <th>Status</th><th>Message</th> +</tr> +""" % (CHIRP_VERSION, CHIRP_VERSION, time.strftime("%x at %X"), os.name) + print(s, file=self._out) + + def cleanup(self): + print("</table></body>", file=self._out) + self._out.close() + print("Done") + + def report(self, rclass, tc, msg, e): + s = ("<tr class='%s'>" % msg) + \ + ("<td class='vendor'>%s</td>" % rclass.VENDOR) + \ + ("<td class='model'>%s %s</td>" % + (rclass.MODEL, rclass.VARIANT)) + \ + ("<td class='tc'>%s</td>" % tc) + \ + ("<td class='%s'>%s</td>" % (msg, msg)) + \ + ("<td class='error'>%s</td>" % e) + \ + "</tr>" + print(s, file=self._out) + sys.stdout.write(".") + sys.stdout.flush() + + +class TestRunner: + def __init__(self, images_dir, test_list, test_out): + self._images_dir = images_dir + self._test_list = test_list + self._test_out = test_out + if not os.path.exists("tmp"): + os.mkdir("tmp") + + def _make_list(self): + run_list = [] + images = glob.glob(os.path.join(self._images_dir, "*.img")) + for image in sorted(images): + drv_name, _ = os.path.splitext(os.path.basename(image)) + run_list.append((directory.get_radio(drv_name), image)) + return run_list + + def report(self, rclass, tc, msg, e): + self._test_out.report(rclass, tc, msg, e) + + def log(self, rclass, tc, e): + fn = "logs/%s_%s.log" % (directory.radio_class_id(rclass), tc) + log = file(fn, "a") + print("---- Begin test %s ----" % tc, file=log) + log.write(e.get_detail()) + print(file=log) + print("---- End test %s ----" % tc, file=log) + log.close() + + def nuke_log(self, rclass, tc): + fn = "logs/%s_%s.log" % (directory.radio_class_id(rclass), tc) + if os.path.exists(fn): + os.remove(fn) + + def _run_one(self, rclass, parm, dst=None): + nfailed = 0 + for tcclass in self._test_list: + nprinted = 0 + tw = TestWrapper(rclass, parm, dst=dst) + tc = tcclass(tw) + + self.nuke_log(rclass, tc) + + tc.prepare() + + try: + failures = tc.run() + for e in failures: + self.report(rclass, tc, "FAILED", e) + if e.get_detail(): + self.log(rclass, tc, e) + nfailed += 1 + nprinted += 1 + except TestFailedError as e: + self.report(rclass, tc, "FAILED", e) + if e.get_detail(): + self.log(rclass, tc, e) + nfailed += 1 + nprinted += 1 + except TestCrashError as e: + self.report(rclass, tc, "CRASHED", e) + self.log(rclass, tc, e) + nfailed += 1 + nprinted += 1 + except TestSkippedError as e: + self.report(rclass, tc, "SKIPPED", e) + self.log(rclass, tc, e) + nprinted += 1 + + tc.cleanup() + + if not nprinted: + self.report(rclass, tc, "PASSED", "All tests") + + return nfailed + + def run_rclass_image(self, rclass, image, dst=None): + rid = "%s_%s_" % (rclass.VENDOR, rclass.MODEL) + rid = rid.replace("/", "_") + # Do this for things like Generic_CSV, that demand it + _base, ext = os.path.splitext(image) + testimage = tempfile.mktemp(ext, rid) + shutil.copy(image, testimage) + + try: + tw = TestWrapper(rclass, testimage, dst=dst) + rf = tw.do("get_features") + if rf.has_sub_devices: + devices = tw.do("get_sub_devices") + failed = 0 + for dev in devices: + failed += self.run_rclass_image(dev.__class__, image, dst=dev) + return failed + else: + return self._run_one(rclass, image, dst=dst) + finally: + os.remove(testimage) + + def run_list(self, run_list): + def _key(pair): + return pair[0].VENDOR + pair[0].MODEL + pair[0].VARIANT + failed = 0 + for rclass, image in sorted(run_list, key=_key): + failed += self.run_rclass_image(rclass, image) + return failed + + def run_all(self): + run_list = self._make_list() + return self.run_list(run_list) + + def run_one(self, drv_name): + return self.run_rclass_image(directory.get_radio(drv_name), + os.path.join("images", + "%s.img" % drv_name)) + + def run_one_live(self, drv_name, port): + rclass = directory.get_radio(drv_name) + pipe = Serial(port=port, baudrate=rclass.BAUD_RATE, timeout=0.5) + tw = TestWrapper(rclass, pipe) + rf = tw.do("get_features") + if rf.has_sub_devices: + devices = tw.do("get_sub_devices") + failed = 0 + for device in devices: + failed += self._run_one(device.__class__, pipe) + return failed + else: + return self._run_one(rclass, pipe) + +if __name__ == "__main__": + import sys + + images = glob.glob("images/*.img") + tests = [os.path.splitext(os.path.basename(img))[0] for img in images] + + op = OptionParser() + op.add_option("-d", "--driver", dest="driver", default=None, + help="Driver to test (omit for all)") + op.add_option("-t", "--test", dest="test", default=None, + help="Test to run (omit for all)") + op.add_option("-e", "--exclude", dest="exclude", default=None, + help="Test to exclude") + op.add_option("", "--html", dest="html", default=None, + help="Output to HTML file") + op.add_option("-l", "--live", dest="live", default=None, + help="Live radio on this port (requires -d)") + op.usage = """ +Available drivers: +%s +Available tests: +%s +""" % ("\n".join([" %s" % x for x in sorted(tests)]), + "\n".join([" %s" % x for x in sorted(TESTS.keys())])) + + (options, args) = op.parse_args() + + if options.html: + test_out = TestOutputHTML(options.html) + else: + stdout = sys.stdout + if not os.path.exists("logs"): + os.mkdir("logs") + sys.stdout = file("logs/verbose", "w") + test_out = TestOutputANSI(stdout) + + test_out.prepare() + + if options.exclude: + del TESTS[options.exclude] + + if options.test: + tr = TestRunner("images", [TESTS[options.test]], test_out) + else: + tr = TestRunner("images", list(TESTS.values()), test_out) + + if options.live: + if not options.driver: + print("Live mode requires a driver to be specified") + sys.exit(1) + failed = tr.run_one_live(options.driver, options.live) + elif options.driver: + failed = tr.run_one(options.driver) + else: + failed = tr.run_all() + + test_out.cleanup() + + sys.exit(failed) |
