Skip to content

Migrate comments

hypha.apply.activity.management.commands.migrate_comments

Command

Bases: BaseCommand

help class-attribute instance-attribute

help = 'Comment migration script. Requires a source JSON file.'

data class-attribute instance-attribute

data = []

add_arguments

add_arguments(parser)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def add_arguments(self, parser):
    parser.add_argument(
        "source", type=argparse.FileType("r"), help="Migration source JSON file"
    )

handle

handle(*args, **options)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
@transaction.atomic
def handle(self, *args, **options):
    with options["source"] as json_data:
        self.data = json.load(json_data)

        for id in self.data:
            self.process(id)

process

process(id)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def process(self, id):
    comment = self.data[id]

    try:
        activity = Activity.objects.create(
            timestamp=datetime.fromtimestamp(int(comment["created"]), timezone.utc),
            user=self.get_user(comment["uid"]),
            submission=self.get_submission(comment["nid"]),
            message=self.get_message(
                comment["subject"], comment["comment_body"]["value"]
            ),
            type="comment",
            visibility="internal",
        )
        # Disable auto_* on date fields so imported dates are used.
        for field in activity._meta.local_fields:
            if field.name == "timestamp":
                field.auto_now_add = False
        try:
            activity.save()
            self.stdout.write(
                f"Processed \"{comment['subject']}\" ({comment['cid']})"
            )
        except IntegrityError:
            self.stdout.write(
                f"Skipped \"{comment['subject']}\" ({comment['cid']}) due to IntegrityError"
            )
            pass
    except ValueError:
        pass

get_user

get_user(uid)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def get_user(self, uid):
    try:
        User = get_user_model()
        return User.objects.get(drupal_id=uid)
    except User.DoesNotExist:
        return None

get_message

get_message(comment_subject, comment_body)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def get_message(self, comment_subject, comment_body):
    message = f"{comment_subject}{comment_body}"
    return message

get_submission

get_submission(nid)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def get_submission(self, nid):
    try:
        return ApplicationSubmission.objects.get(drupal_id=nid)
    except ApplicationSubmission.DoesNotExist:
        return "None"

nl2br

nl2br(value)
Source code in hypha/apply/activity/management/commands/migrate_comments.py
def nl2br(self, value):
    return value.replace("\r\n", "<br>\n")