I've recently discovered the wonders of the Python world, and am quickly learning. Coming from Windows/C#/.NET, I find it refreshing working in Python on Linux. A day you've learned something new is not a day wasted.
I need to unpack data received from a device. Data is received as a string of "bytes", of arbitrary length. Each packet (string) consists of samples, for eight channels. The number of samples varies, but will always be a multiple of the number of channels. The channels are interleaved. To make things a bit more complex, samples can be either 8 or 16 bits in length. Check the code, and you'll see.
I've already got a working implementation. However, as I've just stumbled upon generators, iterators, maps and ... numpy, I suspect there might be a more efficient way of doing it. If not efficient, maybe more "pythonic". I'm curious, and if someone would spend some time giving me a pointer in the right (or any) direction, I would be very grateful. As of now, I am aware of the fact that my Python has a strong smell of C#. But I'm learning ...
This is my working implementation. It is efficient enough, but I suspect it can be improved. Especially the de-interleaving part. On my machine it prints:
time to create generator: 0:00:00.000040
time to de-interleave data: 0:00:00.004111
length of channel A is 750: True
As you can see, creating the generator takes no amount of time. De-interleaving the data is the real issue. Maybe the data generation and de-interleaving can be done simultaneously?
This is not my first implementation, but I never seem to be able to drop below approx 4 ms.
from datetime import datetime
def unpack_data(data):
l = len(data)
p = 0
while p < l:
# convert 'char' or byte to (signed) int8
i1 = (((ord(data[p]) + 128) % 256) - 128)
p += 1
if i1 & 0x01:
# read next 'char' as an (unsigned) uint8
#
# due to the nature of the protocol,
# we will always have sufficient data
# available to avoid reading past the end
i2 = ord(data[p])
p += 1
yield (i1 >> 1 << 8) + i2
else:
yield i1 >> 1
# generate some test data ...
test_data = ''
for n in range(500 * 12 * 2 - 1):
test_data += chr(n % 256)
t0 = datetime.utcnow()
# in this example we have 6000 samples, 8 channels, 750 samples/channel
# data received is interleaved: A1, B1, C1, ..., A2, B2, C2, ... F750, G750, H750
channels = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H')
samples = { channel : [] for channel in channels}
# call unpack_data(), receive a generator
gen = unpack_data(test_data)
t1 = datetime.utcnow()
print 'time to create generator: %s' % (t1-t0)
try:
while True:
for channel in channels:
samples[channel].append(gen.next())
except StopIteration:
pass
print 'time to de-interleave data: %s' % (datetime.utcnow()-t1)
print 'length of channel A is 750: %s' % (len(samples['A']) == 750)