diff options
| author | Matthew Poletiek <matthew.poletiek@gmail.com> | 2020-12-08 21:03:16 -0600 |
|---|---|---|
| committer | Matthew Poletiek <matthew.poletiek@gmail.com> | 2020-12-08 21:03:16 -0600 |
| commit | e99416456afd4aa8bde42016826f9a345291cbf3 (patch) | |
| tree | a7a95639cd1cb5dbe2d91a2ca8e8defafac4296d /tests/icom_clone_simulator.py | |
| parent | 194cf4e5e0b6a2811103a9b739a72b9afe2b886c (diff) | |
| download | chirp-e99416456afd4aa8bde42016826f9a345291cbf3.tar.gz chirp-e99416456afd4aa8bde42016826f9a345291cbf3.tar.xz | |
Initial Commit
Diffstat (limited to 'tests/icom_clone_simulator.py')
| -rw-r--r-- | tests/icom_clone_simulator.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/tests/icom_clone_simulator.py b/tests/icom_clone_simulator.py new file mode 100644 index 0000000..e13572a --- /dev/null +++ b/tests/icom_clone_simulator.py @@ -0,0 +1,195 @@ +# Copyright 2019 Dan Smith <dsmith@danplanet.com> +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from builtins import bytes + +import struct + +from chirp.drivers import icf + +import logging + +LOG = logging.getLogger(__name__) + + +import sys +l = logging.getLogger() +l.level = logging.ERROR +l.addHandler(logging.StreamHandler(sys.stdout)) + +class FakeIcomRadio(object): + def __init__(self, radio, mapfile=None): + self._buffer = bytes(b'') + self._radio = radio + if not mapfile: + self._memory = bytes(b'\x00') * radio.get_memsize() + else: + self.load_from_file(mapfile) + + def load_from_file(self, filename): + with open(filename, 'rb') as f: + self._memory = bytes(f.read()) + LOG.debug('Initialized %i bytes from %s' % (len(self._memory), + filename)) + + def read(self, count): + """read() from radio, so here we synthesize responses""" + chunk = self._buffer[:count] + self._buffer = self._buffer[count:] + return chunk + + def queue(self, data): + # LOG.debug('Queuing: %r' % data) + self._buffer += data + + def make_response(self, cmd, payload): + return bytes([ + 0xFE, 0xFE, + 0xEF, # Radio + 0xEE, # PC + cmd, + ]) + payload + bytes([0xFD]) + + @property + def address_fmt(self): + if self._radio.get_memsize() > 0x10000: + return 'I' + else: + return 'H' + + def do_clone_out(self): + LOG.debug('Clone from radio started') + size = 16 + for addr in range(0, self._radio.get_memsize(), size): + if len(self._memory[addr:]) < 4: + # IC-W32E has an off-by-one hack for detection, + # which will cause us to send a short one-byte + # block of garbage, unlike the real radio. So, + # if we get to the end and have a few bytes + # left, don't be stupid. + break + header = bytes(struct.pack('>%sB' % self.address_fmt, + addr, size)) + #LOG.debug('Header for %02x@%04x: %r' % ( + # size, addr, header)) + chunk = [] + cs = 0 + for byte in header: + chunk.extend(x for x in bytes(b'%02X' % byte)) + cs += byte + #LOG.debug('Chunk so far: %r' % chunk) + for byte in self._memory[addr:addr + size]: + chunk.extend(x for x in bytes(b'%02X' % byte)) + cs += byte + #LOG.debug('Chunk is %r' % chunk) + + vx = ((cs ^ 0xFFFF) + 1) & 0xFF + chunk.extend(x for x in bytes(b'%02X' % vx)) + self.queue(self.make_response(icf.CMD_CLONE_DAT, bytes(chunk))) + #LOG.debug('Stopping after first frame') + #break + self.queue(self.make_response(icf.CMD_CLONE_END, bytes([]))) + + def do_clone_in(self): + LOG.debug('Clone to radio started') + self._memory = bytes(b'') + + def do_clone_data(self, payload_hex): + if self.address_fmt == 'I': + header_len = 5 + else: + header_len = 3 + + def hex_to_byte(hexchars): + return int('%s%s' % (chr(hexchars[0]), chr(hexchars[1])), 16) + + payload_bytes = bytes([hex_to_byte(payload_hex[i:i+2]) + for i in range(0, len(payload_hex), 2)]) + + addr, size = struct.unpack('>%sB' % self.address_fmt, payload_bytes[:header_len]) + data = payload_bytes[header_len:-1] + csum = payload_bytes[-1] + + #addr_hex = payload[0:size_offset] + #size_hex = payload[size_offset:size_offset + 2] + #data_hex = payload[size_offset + 2:-2] + #csum_hex = payload[-2:] + + + #addr = hex_to_byte(addr_hex[0:2]) << 8 | hex_to_byte(addr_hex[2:4]) + #size = hex_to_byte(size_hex) + #csum = hex_to_byte(csum_hex) + + #data = [] + #for i in range(0, len(data_hex), 2): + # data.append(hex_to_byte(data_hex[i:i+2])) + + if len(data) != size: + LOG.debug('Invalid frame size: expected %i, but got %i' % ( + size, len(data))) + + expected_addr = len(self._memory) + if addr < expected_addr: + LOG.debug('Frame goes back to %04x from %04x' % (addr, + expected_addr)) + if len(self._memory) != addr: + LOG.debug('Filling gap between %04x and %04x' % (expected_addr, + addr)) + self._memory += (bytes(b'\x00') * (addr - expected_addr)) + + # FIXME: Check checksum + + self._memory += data + + def write(self, data): + """write() to radio, so here we process requests""" + + assert isinstance(data, bytes), 'Bytes required, %s received' % data.__class__ + + if data[:12] == (bytes(b'\xFE') * 12): + LOG.debug('Got hispeed kicker') + data = data[12:] + if data[2] == 0xFE: + return + + src = data[2] + dst = data[3] + cmd = data[4] + payload = data[5:-1] + end = data[-1] + + LOG.debug('Received command: %r' % cmd) + LOG.debug(' Full frame: %r' % data) + + model = self._radio.get_model() + bytes(b'\x00' * 20) + + if cmd == 0xE0: # Ident + # FIXME + self.queue(self.make_response(0x01, # Model + model)) + elif cmd == icf.CMD_CLONE_OUT: + self.do_clone_out() + elif cmd == icf.CMD_CLONE_IN: + self.do_clone_in() + elif cmd == icf.CMD_CLONE_DAT: + self.do_clone_data(payload) + else: + LOG.debug('Unknown command %i' % cmd) + self.queue(self.make_response(0x00, bytes([0x01]))) + + return len(data) + + def flush(self): + return |
