Hallo Leute,
für mein PIGPIO-Modul versuche ich einen Code vom PIGPIO-Entwickler selbst für ein modul zu adaptieren.
Hier fehlt mir aber das Verständnis, wie der Code funktioniert:
#!/usr/bin/env python
import time
import pigpio
def bit_get(word, n):
"""
Get bit n of word.
The least significant bit is bit 0.
"""
return (word>>n)&1
def bit_set(word, n, value):
"""
Set bit n of word to value.
The least significant bit is bit 0.
"""
value = (value&1L)<<n
mask = (1L)<<n
return (word & ~mask) | value
def bits_get(word, lsb, msb):
"""
Get bits lsb to msb of word.
The least significant bit is bit 0.
"""
mask = 2L**(msb + 1 - lsb) - 1
return (word >> lsb) & mask
def bits_set(word, lsb, msb, value):
"""
Set bits lsb to msb of word to value.
The least significant bit is bit 0.
"""
mask = 2L**(msb + 1 - lsb) - 1
value = (value & mask) << lsb
mask = mask << lsb
return (word & ~mask) | value
def bits_reverse(word, lsb, msb):
"""
Get bits lsb to msb of word and reverse them.
The least significant bit is bit 0.
"""
result = 0
for i in range(lsb, msb+1):
if (word >> i) & 1: result |= 1 << (msb - i)
return result
def _carrier(gpio, frequency, micros, dutycycle=0.5):
"""
Generate cycles of carrier on gpio with frequency and dutycycle.
"""
wf = []
cycle = 1000000.0 / frequency
cycles = int(round(micros/cycle))
on = int(round(cycle * dutycycle))
sofar = 0
for c in range(cycles):
target = int(round((c+1)*cycle))
sofar += on
off = target - sofar
sofar += off
wf.append(pigpio.pulse(1<<gpio, 0, on))
wf.append(pigpio.pulse(0, 1<<gpio, off))
return wf
class pwc:
"""
Pulse width coding.
"""
def __init__(self,
pi, gpio, freq, mark_0, mark_1, space, duty=0.5):
self._pi = pi
self._gpio = gpio
pi.set_mode(gpio, pigpio.OUTPUT)
pi.wave_add_new()
# mark - 0 bit
pi.wave_add_generic(_carrier(gpio, freq, mark_0, duty))
self._w_mark_0 = pi.wave_create()
# mark - 1 bit
pi.wave_add_generic(_carrier(gpio, freq, mark_1, duty))
self._w_mark_1 = pi.wave_create()
# space
pi.wave_add_generic([pigpio.pulse(0, 0, space)])
self._w_space = pi.wave_create()
self._bit = [[self._w_mark_0, self._w_space],
[self._w_mark_1, self._w_space]]
def format(self, data, bits):
chain = []
for i in range(bits-1, -1, -1):
chain += self._bit[(data>>i) & 1]
return chain
def w_mark_0(self):
return self._w_mark_0
def w_mark_1(self):
return self._w_mark_1
def w_space(self):
return self._w_space
def cancel(self):
if self._w_mark_0 is not None:
self._pi.wave_delete(self._w_mark_0)
self._w_mark_0 = None
if self._w_mark_1 is not None:
self._pi.wave_delete(self._w_mark_1)
self._w_mark_1 = None
if self._w_space is not None:
self._pi.wave_delete(self._w_space)
self._w_space = None
class SonySIRC12:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&31, 0, 4)
self._pwc = pwc(pi, gpio, 40000, 600, 1200, 600)
def set_address(self, address):
self._address = bits_reverse(address&31, 0, 4)
def send_raw(self, data, bits, repeats=1):
chain = [
self._pwc.w_mark_1(), self._pwc.w_mark_1(), # 2.4 ms mark
self._pwc.w_space()] # 0.6 ms space
chain += self._pwc.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.108)
def send(self, command, repeats=1):
command = bits_reverse(command&127, 0, 6)
data = (command<<5) | self._address
self.send_raw(data, 12, repeats)
def cancel(self):
self._pwc.cancel()
class pdc:
"""
Pulse distance coding.
"""
def __init__(self,
pi, gpio, freq, mark, bit_0, bit_1, duty=0.5):
self._pi = pi
self._gpio = gpio
pi.set_mode(gpio, pigpio.OUTPUT)
pi.wave_add_new()
# mark
pi.wave_add_generic(_carrier(gpio, freq, mark, duty))
self._w_mark = pi.wave_create()
# space - 0 bit
pi.wave_add_generic([pigpio.pulse(0, 0, bit_0)])
self._wid_bit_0 = pi.wave_create()
# space - 1 bit
pi.wave_add_generic([pigpio.pulse(0, 0, bit_1)])
self._wid_bit_1 = pi.wave_create()
self._bit = [[self._w_mark, self._wid_bit_0],
[self._w_mark, self._wid_bit_1]]
def format(self, data, bits):
chain = []
for i in range(bits-1, -1, -1):
chain += self._bit[(data>>i) & 1]
return chain
def w_mark(self):
return self._w_mark
def wid_bit_0(self):
return self._wid_bit_0
def wid_bit_1(self):
return self._wid_bit_1
def cancel(self):
if self._w_mark is not None:
self._pi.wave_delete(self._w_mark)
self._w_mark = None
if self._wid_bit_0 is not None:
self._pi.wave_delete(self._wid_bit_0)
self._wid_bit_0 = None
if self._wid_bit_1 is not None:
self._pi.wave_delete(self._wid_bit_1)
self._wid_bit_1 = None
class Sharp:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&31, 0, 4)
self._pdc = pdc(pi, gpio, 38000, 320, 680, 1680)
def set_address(self, address):
self._address = bits_reverse(address&31, 0, 4)
def send_raw(self, data1, data2, bits, repeats=1):
chain = (
self._pdc.format(data1, bits) +
[255, 2, 0x40, 0x9C] +
self._pdc.format(data2, bits))
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.108)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
cmdexpchk = (command<<2) | 0b10
inverse = cmdexpchk^0x3FF
data1 = (self._address<<10) | cmdexpchk
data2 = (self._address<<10) | inverse
self.send_raw(data1, data2, 15, repeats)
def cancel(self):
self._pdc.cancel()
class Samsung:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&255, 0, 7)
self._pdc = pdc(pi, gpio, 38000, 580, 580, 1740)
def set_address(self, address):
self._address = bits_reverse(address&255, 0, 7)
def send_raw(self, data, bits, repeats=1):
chain = [
255, 0, self._pdc.w_mark(), 255, 1, 8, 0, # 4.5 ms mark
255, 2, 0x94, 0x11] # 4.5 ms space
chain += self._pdc.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.108)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
data = (
(self._address<<25) |
(self._address<<17) |
(command<<9) |
((command^255)<<1) )
self.send_raw(data, 33, repeats)
def cancel(self):
self._pdc.cancel()
class NEC:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&255, 0, 7)
self._pdc = pdc(pi, gpio, 38000, 553, 572, 1697)
def set_address(self, address):
self._address = bits_reverse(address&255, 0, 7)
def send_raw(self, data, bits, repeats=1):
chain = [
255, 0, self._pdc.w_mark(), 255, 1, 16, 0, # 9 ms mark
255, 2, 0x94, 0x11] # 4.5 ms space
chain += self._pdc.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
data = (
(self._address<<25) |
((self._address^255)<<17) |
(command<<9) |
((command^255)<<1) )
self.send_raw(data, 33, repeats)
def cancel(self):
self._pdc.cancel()
class SANYO:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&0x1FFF, 0, 12)
self._pdc = pdc(pi, gpio, 38000, 553, 553, 1659)
def set_address(self, address):
self._address = bits_reverse(address&0x1FFF, 0, 12)
def send_raw(self, data, bits, repeats=1):
chain = [
255, 0, self._pdc.w_mark(), 255, 1, 16, 0, # 9 ms mark
255, 2, 0x94, 0x11] # 4.5 ms space
chain += self._pdc.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
data = (
(self._address<<29) |
((self._address^0x1FFF)<<16) |
(command<<8) |
(command^255) )
self.send_raw(data, 42, repeats)
def cancel(self):
self._pdc.cancel()
class JVC:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&255, 0, 7)
self._pdc = pdc(pi, gpio, 38000, 526, 524, 1574)
def set_address(self, address):
self._address = bits_reverse(address&255, 0, 7)
def send_raw(self, data, bits, repeats=1):
chain = [
255, 0, self._pdc.w_mark(), 255, 1, 16, 0, # 8.4 ms mark
255, 2, 0x68, 0x10] # 4.2 ms space
chain += self._pdc.format(data, bits)
print(chain)
self._pi.wave_chain(chain)
time.sleep(0.055)
chain = self._pdc.format(data, bits)
for i in range(repeats-1):
self._pi.wave_chain(chain)
time.sleep(0.055)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
data = (self._address<<8) | command
self.send_raw(data, 16, repeats)
def cancel(self):
self._pdc.cancel()
class bip:
"""
Bi-phase (Manchester) coding.
"""
def __init__(self,
pi, gpio, freq, mark, space, rising_1, duty=0.5):
self._pi = pi
self._gpio = gpio
self._rising_1 = rising_1
pi.set_mode(gpio, pigpio.OUTPUT)
pi.wave_add_new()
# mark
pi.wave_add_generic(_carrier(gpio, freq, mark, duty))
self._w_mark = pi.wave_create()
# space
pi.wave_add_generic([pigpio.pulse(0, 0, space)])
self._w_space = pi.wave_create()
if rising_1:
self._bit = [[self._w_mark, self._w_space],
[self._w_space, self._w_mark]]
else:
self._bit = [[self._w_space, self._w_mark],
[self._w_mark, self._w_space]]
def format(self, data, bits):
chain = []
for i in range(bits-1, -1, -1):
chain += self._bit[(data>>i) & 1]
return chain
def w_space(self):
return self._w_space
def w_mark(self):
return self._w_mark
def cancel(self):
if self._w_mark is not None:
self._pi.wave_delete(self._w_mark)
self._w_mark = None
if self._w_space is not None:
self._pi.wave_delete(self._w_space)
self._w_space = None
class NRC17:
"""
Nokia.
"""
def __init__(self, pi, gpio, address=0, subcode=0):
self._pi = pi
self._gpio = gpio
self._address = bits_reverse(address&15, 0, 3)
self._subcode = bits_reverse(subcode&15, 0, 3)
self._bip = bip(pi, gpio, 38000, 500, 500, False)
def set_address(self, address):
self._address = bits_reverse(address&15, 0, 3)
def set_subcode(self, subcode):
self._subcode = bits_reverse(subcode&15, 0, 3)
def send_raw(self, data, bits, repeats=1):
start_stop = [
self._bip.w_mark(), # 0.5 ms mark
255, 2, 0xC4, 0x09] # 2.5 ms space
ss = 1<<16 | 0x7F<<8 | 0xFF
start_stop += self._bip.format(ss, 17)
chain = [
self._bip.w_mark(), # 0.5 ms mark
255, 2, 0xC4, 0x09] # 2.5 ms space
chain += self._bip.format(data, bits)
print(chain)
self._pi.wave_chain(start_stop)
time.sleep(0.04)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
self._pi.wave_chain(start_stop)
time.sleep(0.04)
def send(self, command, repeats=1):
command = bits_reverse(command&255, 0, 7)
data = (
(1<<16) |
(command<<8) |
(self._address<<4) |
self._subcode )
self.send_raw(data, 17, repeats)
def cancel(self):
self._bip.cancel()
class RC5:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address&31
self._toggle = 0
self._bip = bip(pi, gpio, 36000, 889, 889, True)
def set_address(self, address):
self._address = address&31
def send_raw(self, data, bits, repeats=1):
print(bin(data), bits, repeats)
chain = self._bip.format(data, bits)
#print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, repeats=1):
command &= 63
if self._toggle:
self._toggle = 0
else:
self._toggle = 1
data = (3<<12) | (self._toggle<<11) | (self._address<<6) | command
self.send_raw(data, 14, repeats)
def cancel(self):
self._bip.cancel()
class RC5X:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address&31
self._toggle = 0
self._bip = bip(pi, gpio, 36000, 889, 889, True)
def set_address(self, address):
self._address = address&31
def send_raw(self, data, bits, repeats=1):
chain = self._bip.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, repeats=1):
if self._toggle:
self._toggle = 0
else:
self._toggle = 1
command &= 127
if command & 64:
field = 0
command &= 63
else:
field = 1
data = (
(1<<13) |
(field<<12) |
(self._toggle<<11) |
(self._address<<6) |
command )
self.send_raw(data, 14, repeats)
def cancel(self):
self._bip.cancel()
class RC6_0:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address&255
self._toggle = 0
self._bip = bip(pi, gpio, 36000, 889, 889, False)
def set_address(self, address):
self._address = address&255
def send_raw(self, data, bits, repeats=1):
mark = self._bip.w_mark()
space = self._bip.w_space()
if self._toggle:
trailer = [mark, mark, space, space]
else:
trailer = [space, space, mark, mark]
chain = [mark, mark, mark, space]
chain += self._bip.format(0b1000, 4)
chain += trailer
chain += self._bip.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, repeats=1):
if self._toggle:
self._toggle = 0
else:
self._toggle = 1
command &= 255
data = (self._address<<8) | command
self.send_raw(data, 16, repeats)
def cancel(self):
self._bip.cancel()
class RC6_6A_SCC:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address&127
self._bip = bip(pi, gpio, 36000, 889, 889, False)
def set_address(self, address):
self._address = address&127
def send_raw(self, data, bits, repeats=1):
mark = self._bip.w_mark()
space = self._bip.w_space()
chain = [mark, mark, mark, space]
chain += self._bip.format(0b1110, 4)
chain += [space, space, mark, mark] # Trailer (6A=0, 6B=1).
chain += self._bip.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, bits, repeats=1):
command &= ((1<<bits)-1)
data = (self._address<<bits) | command
self.send_raw(data, bits+8, repeats)
def cancel(self):
self._bip.cancel()
class RC6_6A_LCC:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address&0x7FFF
self._bip = bip(pi, gpio, 36000, 889, 889, False)
def set_address(self, address):
self._address = address&0x7FFF
def send_raw(self, data, bits, repeats=1):
mark = self._bip.w_mark()
space = self._bip.w_space()
chain = [mark, mark, mark, space]
chain += self._bip.format(0b1110, 4)
chain += [space, space, mark, mark] # Trailer (6A=0, 6B=1).
chain += self._bip.format(data, bits)
print(chain)
for i in range(repeats):
self._pi.wave_chain(chain)
time.sleep(0.1)
def send(self, command, bits, repeats=1):
command &= ((1<<bits)-1)
data = (self._address<<bits) | command
self.send_raw(data, bits+16, repeats)
def cancel(self):
self._bip.cancel()
class SamsungRead:
"""
Decodes a Samsung remote control (32 bits, 16 address, 16 command).
"""
def __init__(self, pi, gpio):
self._pi = pi
self._gpio = gpio
self._lt = None
self._code = 0
self._bits = 0
pi.set_mode(gpio, pigpio.INPUT)
self._cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cbf)
def _cbf(self, g, l, t):
if self._lt is not None:
td = pigpio.tickDiff(self._lt, t)
if l == 0:
self._code = self._code << 1
self._bits += 1
if td > 3000:
self._code = 0
self._bits = 0
elif td > 1000:
self._code = self._code | 1
if self._bits == 32:
print(hex(self._code))
self._lt = t
class ITT:
"""
"""
def __init__(self, pi, gpio, address=0):
self._pi = pi
self._gpio = gpio
self._address = address
pi.set_mode(gpio, pigpio.OUTPUT)
def set_address(self, address):
self._address = address
def send(self, command, repeats=1):
wf = [pigpio.pulse(1<<self._gpio, 0, 10),
pigpio.pulse(0, 1<<self._gpio, 300),
pigpio.pulse(1<<self._gpio, 0, 10),
pigpio.pulse(0, 1<<self._gpio, 100)]
data = (self._address<<6) | command
for i in range(9, -1, -1):
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
if data & (1<<i):
wf.append(pigpio.pulse(0, 1<<self._gpio, 200))
else:
wf.append(pigpio.pulse(0, 1<<self._gpio, 100))
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
wf.append(pigpio.pulse(0, 1<<self._gpio, 300))
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
wf.append(pigpio.pulse(0, 1<<self._gpio, 0))
self._pi.wave_add_generic(wf)
wid1 = self._pi.wave_create()
wf = [pigpio.pulse(1<<self._gpio, 0, 10),
pigpio.pulse(0, 1<<self._gpio, 300),
pigpio.pulse(1<<self._gpio, 0, 10),
pigpio.pulse(0, 1<<self._gpio, 100)]
data = ((self._address^15)<<6) | command
for i in range(9, -1, -1):
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
if data & (1<<i):
wf.append(pigpio.pulse(0, 1<<self._gpio, 200))
else:
wf.append(pigpio.pulse(0, 1<<self._gpio, 100))
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
wf.append(pigpio.pulse(0, 1<<self._gpio, 300))
wf.append(pigpio.pulse(1<<self._gpio, 0, 10))
wf.append(pigpio.pulse(0, 1<<self._gpio, 0))
self._pi.wave_add_generic(wf)
wid2 = self._pi.wave_create()
self._pi.wave_send_once(wid1)
time.sleep(0.13)
for i in range(repeats-1):
self._pi.wave_send_once(wid2)
time.sleep(0.13)
self._pi.wave_delete(wid1)
self._pi.wave_delete(wid2)
def cancel(self):
pass
if __name__ == "__main__":
import ir_protocol
GPIO=23
GPIO_RX=8
pi = pigpio.pi()
#r = SamsungRead(pi, GPIO_RX)
#time.sleep(60)
GPIO_mode = pi.get_mode(GPIO)
s1 = ir_protocol.RC6_0(pi, GPIO)
s2 = ir_protocol.RC5(pi, GPIO)
for a in range(256):
s1.set_address(a)
s2.set_address(a)
for c in range(256):
s1.send(c, 2)
s2.send(c, 2)
time.sleep(0.2)
print(a, c)
s.cancel()
pi.set_mode(GPIO, GPIO_mode)
pi.stop()
Wie würde ich hier vorgehen, wenn ich z.B. mit der class RC5: einen Befehl absetzen wollen würde?
Joachim