Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Prusa MK3 firmware language pack #325

Open
wants to merge 2 commits into
base: devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 101 additions & 47 deletions octoprint_firmwareupdater/methods/avrdude.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sarge
import serial
import tempfile
import time

try:
Expand Down Expand Up @@ -65,6 +66,52 @@ def _check_avrdude(self):
else:
return True

def _run_avrdude_cmd(self, cmd, cwd):
self._logger.info(u"Running '{}' in {}".format(cmd, cwd))
self._console_logger.info(u"")
self._console_logger.info(cmd)

p = sarge.run(cmd, cwd=cwd, async_=True, stdout=sarge.Capture(), stderr=sarge.Capture())
p.wait_events()

while p.returncode is None:
output = p.stderr.read(timeout=0.5).decode('utf-8')
if not output:
p.commands[0].poll()
continue

for line in output.split("\n"):
if line.endswith("\r"):
line = line[:-1]
self._console_logger.info(u"> {}".format(line))

if AVRDUDE_WRITING in output:
self._logger.info(u"Writing memory...")
self._send_status("progress", subtype="writing")
elif AVRDUDE_VERIFYING in output:
self._logger.info(u"Verifying memory...")
self._send_status("progress", subtype="verifying")
elif AVRDUDE_TIMEOUT in output:
p.commands[0].kill()
p.close()
raise FlashException("Timeout communicating with programmer")
elif AVRDUDE_ERROR_DEVICE in output:
p.commands[0].kill()
p.close()
raise FlashException("Error opening serial device")
elif AVRDUDE_ERROR_VERIFICATION in output:
p.commands[0].kill()
p.close()
raise FlashException("Error verifying flash")
elif AVRDUDE_ERROR_SYNC in output:
p.commands[0].kill()
p.close()
raise FlashException("Avrdude says: 'not in sync" + output[output.find(AVRDUDE_ERROR_SYNC) + len(AVRDUDE_ERROR_SYNC):].strip() + "'")
elif AVRDUDE_ERROR in output:
raise FlashException("Avrdude error: " + output[output.find(AVRDUDE_ERROR) + len(AVRDUDE_ERROR):].strip())

return p.returncode

def _flash_avrdude(self, firmware=None, printer_port=None, **kwargs):
assert(firmware is not None)
assert(printer_port is not None)
Expand All @@ -76,12 +123,13 @@ def _flash_avrdude(self, firmware=None, printer_port=None, **kwargs):
avrdude_baudrate = self.get_profile_setting("avrdude_baudrate")
avrdude_disableverify = self.get_profile_setting_boolean("avrdude_disableverify")

avrdude_commands_extra = []

working_dir = os.path.dirname(avrdude_path)

avrdude_command = self.get_profile_setting("avrdude_commandline")
avrdude_command = avrdude_command.replace("{avrdude}", avrdude_path)
avrdude_command = avrdude_command.replace("{mcu}", avrdude_avrmcu)
avrdude_command = avrdude_command.replace("{programmer}", avrdude_programmer)

if avrdude_conf is not None and avrdude_conf != "":
avrdude_command = avrdude_command.replace("{conffile}", avrdude_conf)
Expand Down Expand Up @@ -176,57 +224,60 @@ def _flash_avrdude(self, firmware=None, printer_port=None, **kwargs):
self._logger.info(u"CW1 not in bootloader")
raise FlashException("Reboot CW1 to bootloader failed")

mk3_fw = None
if (avrdude_path is not None and "prusa" in avrdude_path and
avrdude_programmer is not None and avrdude_programmer == "wiring"):
try:
list_ports.comports()
app_port_name = list(list_ports.grep(USB_VID_PID_MK3))[0][0]
self._logger.info(u"Found Prusa MK3 device: {}. Checking firmware file.".format(app_port_name))
except Exception as e:
self._logger.info(u"blah {}".format(e))
pass
else:
# Prusa MK3 USB device found. Check firmware file.
prusa_mk3_section_marker = ":00000001FF"
with open(firmware,"r+") as f:
mk3_sections = 0
lines = f.readlines()
for i in lines:
if i.strip() == prusa_mk3_section_marker:
mk3_sections += 1
if mk3_sections == 2:
self._logger.info(u"Firmware file seems compatible with Prusa MK3.")
mk3_sections = 0
mk3_fw = tempfile.NamedTemporaryFile(mode='r+')
for i in lines:
# Go past first marker.
if mk3_sections == 0:
if i.strip() == prusa_mk3_section_marker:
mk3_sections += 1
continue
# Copy second part of firmware file.
mk3_fw.write(i)
mk3_fw.flush()
mk3_command = avrdude_command + " -u"
mk3_command = mk3_command.replace("{port}", printer_port)
mk3_command = mk3_command.replace("{firmware}", mk3_fw.name)
mk3_command = mk3_command.replace("{programmer}", "arduino")

avrdude_commands_extra.append(mk3_command)
else:
self._logger.info(u"Non Prusa MK3 firmware file.")

avrdude_command = avrdude_command.replace("{port}", printer_port)
avrdude_command = avrdude_command.replace("{firmware}", firmware)
avrdude_command = avrdude_command.replace("{programmer}", avrdude_programmer)

self._logger.info(u"Running '{}' in {}".format(avrdude_command, working_dir))
self._console_logger.info(u"")
self._console_logger.info(avrdude_command)
commands = [ avrdude_command ] + avrdude_commands_extra

try:
p = sarge.run(avrdude_command, cwd=working_dir, async_=True, stdout=sarge.Capture(), stderr=sarge.Capture())
p.wait_events()

while p.returncode is None:
output = p.stderr.read(timeout=0.5).decode('utf-8')
if not output:
p.commands[0].poll()
continue

for line in output.split("\n"):
if line.endswith("\r"):
line = line[:-1]
self._console_logger.info(u"> {}".format(line))

if AVRDUDE_WRITING in output:
self._logger.info(u"Writing memory...")
self._send_status("progress", subtype="writing")
elif AVRDUDE_VERIFYING in output:
self._logger.info(u"Verifying memory...")
self._send_status("progress", subtype="verifying")
elif AVRDUDE_TIMEOUT in output:
p.commands[0].kill()
p.close()
raise FlashException("Timeout communicating with programmer")
elif AVRDUDE_ERROR_DEVICE in output:
p.commands[0].kill()
p.close()
raise FlashException("Error opening serial device")
elif AVRDUDE_ERROR_VERIFICATION in output:
p.commands[0].kill()
p.close()
raise FlashException("Error verifying flash")
elif AVRDUDE_ERROR_SYNC in output:
p.commands[0].kill()
p.close()
raise FlashException("Avrdude says: 'not in sync" + output[output.find(AVRDUDE_ERROR_SYNC) + len(AVRDUDE_ERROR_SYNC):].strip() + "'")
elif AVRDUDE_ERROR in output:
raise FlashException("Avrdude error: " + output[output.find(AVRDUDE_ERROR) + len(AVRDUDE_ERROR):].strip())

if p.returncode == 0:
return True
else:
raise FlashException("Avrdude returned code {returncode}".format(returncode=p.returncode))
for cmd in commands:
r = _run_avrdude_cmd(self, cmd, cwd=working_dir)
if r != 0:
raise FlashException("Avrdude returned code {returncode}".format(returncode=r))
time.sleep(1)
return True

except FlashException as ex:
self._logger.error(u"Flashing failed. {error}.".format(error=ex.reason))
Expand All @@ -236,6 +287,9 @@ def _flash_avrdude(self, firmware=None, printer_port=None, **kwargs):
self._logger.exception(u"Flashing failed. Unexpected error.")
self._send_status("flasherror")
return False
finally:
if mk3_fw:
mk3_fw.close()

class FlashException(Exception):
def __init__(self, reason, *args, **kwargs):
Expand Down