Skip to content

Submissions

hypha.apply.funds.models.submissions

JSONOrderable

Bases: QuerySet

json_field class-attribute instance-attribute

json_field = ''

order_by

order_by(*field_names)
Source code in hypha/apply/funds/models/submissions.py
def order_by(self, *field_names):
    if not self.json_field:
        raise ValueError(
            "json_field cannot be blank, please provide a field on which to perform the ordering"
        )

    def build_json_order_by(field):
        try:
            if field.replace("-", "") not in NAMED_BLOCKS:
                return field
        except AttributeError:
            return field

        if field[0] == "-":
            descending = True
            field = field[1:]
        else:
            descending = False
        db_table = self.model._meta.db_table
        return OrderBy(
            RawSQL(f"LOWER({db_table}.{self.json_field}->>%s)", (field,)),
            descending=descending,
            nulls_last=True,
        )

    field_ordering = [build_json_order_by(field) for field in field_names]
    return super().order_by(*field_ordering)

ApplicationSubmissionQueryset

Bases: JSONOrderable

json_field class-attribute instance-attribute

json_field = 'form_data'

order_by

order_by(*field_names)
Source code in hypha/apply/funds/models/submissions.py
def order_by(self, *field_names):
    if not self.json_field:
        raise ValueError(
            "json_field cannot be blank, please provide a field on which to perform the ordering"
        )

    def build_json_order_by(field):
        try:
            if field.replace("-", "") not in NAMED_BLOCKS:
                return field
        except AttributeError:
            return field

        if field[0] == "-":
            descending = True
            field = field[1:]
        else:
            descending = False
        db_table = self.model._meta.db_table
        return OrderBy(
            RawSQL(f"LOWER({db_table}.{self.json_field}->>%s)", (field,)),
            descending=descending,
            nulls_last=True,
        )

    field_ordering = [build_json_order_by(field) for field in field_names]
    return super().order_by(*field_ordering)

active

active()
Source code in hypha/apply/funds/models/submissions.py
def active(self):
    return self.filter(status__in=active_statuses, is_archive=False)

inactive

inactive()
Source code in hypha/apply/funds/models/submissions.py
def inactive(self):
    return self.exclude(Q(status__in=active_statuses) | Q(is_archive=True))

in_community_review

in_community_review(user)
Source code in hypha/apply/funds/models/submissions.py
def in_community_review(self, user):
    qs = self.filter(
        Q(status__in=COMMUNITY_REVIEW_PHASES),
        ~Q(user=user),
        ~Q(reviews__author=user)
        | (Q(reviews__author=user) & Q(reviews__is_draft=True)),
    )
    qs = qs.exclude(
        reviews__opinions__opinion=AGREE, reviews__opinions__author=user
    )
    return qs.distinct()

in_review

in_review()
Source code in hypha/apply/funds/models/submissions.py
def in_review(self):
    return self.filter(status__in=review_statuses)

in_review_for

in_review_for(user, assigned=True)
Source code in hypha/apply/funds/models/submissions.py
def in_review_for(self, user, assigned=True):
    user_review_statuses = get_review_active_statuses(user)
    qs = self.prefetch_related("reviews__author__reviewer")
    qs = qs.filter(
        Q(status__in=user_review_statuses),
        ~Q(reviews__author__reviewer=user)
        | (Q(reviews__author__reviewer=user) & Q(reviews__is_draft=True)),
    )
    if assigned:
        qs = qs.filter(reviewers=user)
        # If this user has agreed with a review, then they have reviewed this submission already
        qs = qs.exclude(
            reviews__opinions__opinion=AGREE,
            reviews__opinions__author__reviewer=user,
        )
    return qs.distinct()

for_reviewer_settings

for_reviewer_settings(user, reviewer_settings)
Source code in hypha/apply/funds/models/submissions.py
def for_reviewer_settings(self, user, reviewer_settings):
    qs = self
    if reviewer_settings.submission == "reviewed":
        qs = qs.reviewed_by(user)
    if reviewer_settings.state == "ext_state_or_higher":
        qs = qs.filter(status__in=ext_or_higher_statuses)
    if reviewer_settings.state == "ext_state_only":
        qs = qs.filter(status__in=ext_review_statuses)
    if reviewer_settings.outcome == "accepted":
        qs = qs.filter(status__in=accepted_statuses)
    if reviewer_settings.outcome == "all_except_dismissed":
        qs = qs.exclude(status__in=dismissed_statuses)
    if reviewer_settings.assigned:
        qs = qs.filter(reviewers=user)
    return qs.distinct()

reviewed_by

reviewed_by(user)
Source code in hypha/apply/funds/models/submissions.py
def reviewed_by(self, user):
    return self.filter(reviews__author__reviewer=user)

flagged_by

flagged_by(user)
Source code in hypha/apply/funds/models/submissions.py
def flagged_by(self, user):
    return self.filter(flags__user=user, flags__type=Flag.USER)

flagged_staff

flagged_staff()
Source code in hypha/apply/funds/models/submissions.py
def flagged_staff(self):
    return self.filter(flags__type=Flag.STAFF)

partner_for

partner_for(user)
Source code in hypha/apply/funds/models/submissions.py
def partner_for(self, user):
    return self.filter(partners=user)

awaiting_determination_for

awaiting_determination_for(user)
Source code in hypha/apply/funds/models/submissions.py
def awaiting_determination_for(self, user):
    return self.filter(status__in=DETERMINATION_RESPONSE_PHASES).filter(lead=user)

undetermined

undetermined()
Source code in hypha/apply/funds/models/submissions.py
def undetermined(self):
    determined_submissions = (
        Determination.objects.filter(submission__in=self)
        .final()
        .values("submission")
    )
    return self.exclude(pk__in=determined_submissions)

current

current()
Source code in hypha/apply/funds/models/submissions.py
def current(self):
    # Applications which have the current stage active (have not been progressed) or not archived yet.
    return self.exclude(Q(next__isnull=False) | Q(is_archive=True))

archived

archived()
Source code in hypha/apply/funds/models/submissions.py
def archived(self):
    return self.filter(is_archive=True)

include_archive

include_archive()
Source code in hypha/apply/funds/models/submissions.py
def include_archive(self):
    # Show all submissions( current submissions + archived submissions)
    return self.exclude(next__isnull=False)

current_accepted

current_accepted()
Source code in hypha/apply/funds/models/submissions.py
def current_accepted(self):
    # Applications which have the current stage active (have not been progressed)
    return self.filter(status__in=PHASES_MAPPING["accepted"]["statuses"]).current()

value

value()
Source code in hypha/apply/funds/models/submissions.py
def value(self):
    return self.annotate(
        value=Cast(
            KeyTextTransform("value", "form_data"), output_field=FloatField()
        )
    ).aggregate(
        Count("value"),
        Avg("value"),
        Sum("value"),
    )

exclude_draft

exclude_draft()
Source code in hypha/apply/funds/models/submissions.py
def exclude_draft(self):
    return self.exclude(status=DRAFT_STATE)

with_latest_update

with_latest_update()
Source code in hypha/apply/funds/models/submissions.py
def with_latest_update(self):
    activities = self.model.activities.rel.model
    latest_activity = activities.objects.filter(
        submission=OuterRef("id")
    ).select_related("user")
    return self.annotate(
        last_user_update=Subquery(latest_activity[:1].values("user__full_name")),
        last_update=Subquery(latest_activity.values("timestamp")[:1]),
    )

for_table

for_table(user)
Source code in hypha/apply/funds/models/submissions.py
def for_table(self, user):
    roles_for_review = self.model.assigned.field.model.objects.with_roles().filter(
        submission=OuterRef("id"), reviewer=user
    )

    qs = annotate_review_recommendation_and_count(self.with_latest_update())
    qs = annotate_comments_count(qs, user)
    return (
        qs.annotate(
            role_icon=Subquery(roles_for_review[:1].values("role__icon")),
        )
        .select_related(
            "page",
            "round",
            "lead",
            "user",
            "previous__page",
            "previous__round",
            "previous__lead",
        )
        .prefetch_related("screening_statuses")
    )

AddTransitions

Bases: ModelBase

ApplicationSubmissionMetaclass

ApplicationSubmission

Bases: WorkflowHelpers, BaseStreamForm, AccessFormData, AbstractFormSubmission

stream_file_class class-attribute instance-attribute

stream_file_class = PrivateStreamFieldFile

storage_class class-attribute instance-attribute

storage_class = PrivateStorage

raw_data property

raw_data

question_field_ids property

question_field_ids

file_field_ids property

file_field_ids

question_text_field_ids property

question_text_field_ids

first_group_question_text_field_ids property

first_group_question_text_field_ids

raw_fields property

raw_fields

fields property

fields

named_blocks property

named_blocks

normal_blocks property

normal_blocks

group_toggle_blocks property

group_toggle_blocks

first_group_normal_text_blocks property

first_group_normal_text_blocks

submission_form_class class-attribute instance-attribute

submission_form_class = PageStreamBaseForm

WORKFLOW_CHOICES class-attribute instance-attribute

WORKFLOW_CHOICES = {name: _ykXfor (name, workflow) in items()}

workflow_name class-attribute instance-attribute

workflow_name = CharField(choices=items(), max_length=100, default='single', verbose_name=gettext_lazy('Workflow'))

workflow property

workflow

form_data class-attribute instance-attribute

form_data = JSONField(encoder=StreamFieldDataEncoder)

form_fields class-attribute instance-attribute

form_fields = StreamField(ApplicationCustomFormFieldsBlock(), use_json_field=True)

public_id class-attribute instance-attribute

public_id = CharField(max_length=255, null=True, blank=True, unique=True, db_index=True)

summary class-attribute instance-attribute

summary = TextField(default='', null=True, blank=True)

page class-attribute instance-attribute

page = ForeignKey('wagtailcore.Page', on_delete=PROTECT)

round class-attribute instance-attribute

round = ForeignKey('wagtailcore.Page', on_delete=PROTECT, related_name='submissions', null=True)

lead class-attribute instance-attribute

lead = ForeignKey(AUTH_USER_MODEL, limit_choices_to=LIMIT_TO_STAFF, related_name='submission_lead', on_delete=PROTECT)

next class-attribute instance-attribute

next = OneToOneField('self', on_delete=CASCADE, related_name='previous', null=True)

reviewers class-attribute instance-attribute

reviewers = ManyToManyField(AUTH_USER_MODEL, related_name='submissions_reviewer', blank=True, through='AssignedReviewers')

partners class-attribute instance-attribute

partners = ManyToManyField(AUTH_USER_MODEL, related_name='submissions_partner', limit_choices_to=LIMIT_TO_PARTNERS, blank=True)

meta_terms class-attribute instance-attribute

meta_terms = ManyToManyField(MetaTerm, related_name='submissions', blank=True)

flags class-attribute instance-attribute

flags = GenericRelation(Flag, content_type_field='target_content_type', object_id_field='target_object_id', related_query_name='submission')

activities class-attribute instance-attribute

activities = GenericRelation('activity.Activity', content_type_field='source_content_type', object_id_field='source_object_id', related_query_name='submission')

user class-attribute instance-attribute

user = ForeignKey(AUTH_USER_MODEL, on_delete=SET_NULL, null=True)

search_data class-attribute instance-attribute

search_data = TextField()

search_document class-attribute instance-attribute

search_document = SearchVectorField(null=True)

status class-attribute instance-attribute

status = FSMField(default=INITIAL_STATE, protected=True)

screening_statuses class-attribute instance-attribute

screening_statuses = ManyToManyField('funds.ScreeningStatus', related_name='submissions', blank=True)

submit_time class-attribute instance-attribute

submit_time = DateTimeField(verbose_name=gettext_lazy('submit time'), auto_now_add=False)

live_revision class-attribute instance-attribute

live_revision = OneToOneField('ApplicationRevision', on_delete=CASCADE, related_name='live', null=True, editable=False)

draft_revision class-attribute instance-attribute

draft_revision = OneToOneField('ApplicationRevision', on_delete=CASCADE, related_name='draft', null=True, editable=False)

drupal_id class-attribute instance-attribute

drupal_id = IntegerField(null=True, blank=True, editable=False)

is_archive class-attribute instance-attribute

is_archive = BooleanField(default=False)

objects class-attribute instance-attribute

objects = as_manager()

wagtail_reference_index_ignore class-attribute instance-attribute

wagtail_reference_index_ignore = True

is_draft property

is_draft

title_text_display property

title_text_display

Return the title text for display across the site.

Use SUBMISSION_TITLE_TEXT_TEMPLATE setting to change format.

stage property

stage

phase property

phase

active property

active

is_determination_form_attached property

is_determination_form_attached

We use old django determination forms but now as we are moving to streamfield determination forms which can be created and attached to funds in admin.

This method checks if there are new determination forms attached to the submission or we would still use the old determination forms for backward compatibility.

has_all_reviewer_roles_assigned property

has_all_reviewer_roles_assigned

community_review property

community_review

missing_reviewers property

missing_reviewers

staff_not_reviewed property

staff_not_reviewed

reviewers_not_reviewed property

reviewers_not_reviewed

flagged_staff property

flagged_staff

ready_for_determination property

ready_for_determination

accepted_for_funding property

accepted_for_funding

in_final_stage property

in_final_stage

in_internal_review_phase property

in_internal_review_phase

in_external_review_phase property

in_external_review_phase

is_finished property

is_finished

Meta

indexes class-attribute instance-attribute
indexes = [GinIndex(fields=['search_document'])]

stream_file classmethod

stream_file(instance, field, file)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def stream_file(cls, instance, field, file):
    if not file:
        return []
    if isinstance(file, cls.stream_file_class):
        return file
    if isinstance(file, File):
        return cls.stream_file_class(
            instance, field, file, name=file.name, storage=cls.storage_class()
        )

    if isinstance(file, PlaceholderUploadedFile):
        return cls.stream_file_class(
            instance,
            field,
            None,
            name=file.file_id,
            filename=file.name,
            storage=cls.storage_class(),
        )

    # This fixes a backwards compatibility issue with #507
    # Once every application has been re-saved it should be possible to remove it
    if "path" in file:
        file["filename"] = file["name"]
        file["name"] = file["path"]
    return cls.stream_file_class(
        instance,
        field,
        None,
        name=file["name"],
        filename=file.get("filename"),
        storage=cls.storage_class(),
    )

process_file classmethod

process_file(instance, field, file)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def process_file(cls, instance, field, file):
    if isinstance(file, list):
        return [cls.stream_file(instance, field, f) for f in file]
    else:
        return cls.stream_file(instance, field, file)

process_file_data

process_file_data(data)
Source code in hypha/apply/funds/models/mixins.py
def process_file_data(self, data):
    for field in self.form_fields:
        if isinstance(field.block, UploadableMediaBlock):
            file = self.process_file(self, field, data.get(field.id, []))
            try:
                file.save()
            except (AttributeError, FileNotFoundError):
                try:
                    for f in file:
                        f.save()
                except FileNotFoundError:
                    pass
            self.form_data[field.id] = file

extract_files

extract_files()
Source code in hypha/apply/funds/models/mixins.py
def extract_files(self):
    files = {}
    for field in self.form_fields:
        if isinstance(field.block, UploadableMediaBlock):
            files[field.id] = self.data(field.id) or []
            self.form_data.pop(field.id, None)
    return files

from_db classmethod

from_db(db, field_names, values)
Source code in hypha/apply/stream_forms/models.py
@classmethod
def from_db(cls, db, field_names, values):
    instance = super().from_db(db, field_names, values)
    if "form_data" in field_names:
        instance.form_data = cls.deserialize_form_data(
            instance, instance.form_data, instance.form_fields
        )
    return instance

deserialised_data classmethod

deserialised_data(instance, data, form_fields)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def deserialised_data(cls, instance, data, form_fields):
    # Converts the file dicts into actual file objects
    data = data.copy()
    # PERFORMANCE NOTE:
    # Do not attempt to iterate over form_fields - that will fully instantiate the form_fields
    # including any sub queries that they do
    for i, field_data in enumerate(form_fields.raw_data):
        block = form_fields.stream_block.child_blocks[field_data["type"]]
        if isinstance(block, UploadableMediaBlock):
            field_id = field_data.get("id")
            if field_id:
                field = form_fields[i]
                file = data.get(field_id, [])
                data[field_id] = cls.process_file(instance, field, file)
    return data

get_definitive_id

get_definitive_id(id)
Source code in hypha/apply/funds/models/mixins.py
def get_definitive_id(self, id):
    if id in self.named_blocks:
        return self.named_blocks[id]
    return id

field

field(id)
Source code in hypha/apply/funds/models/mixins.py
def field(self, id):
    definitive_id = self.get_definitive_id(id)
    try:
        return self.raw_fields[definitive_id]
    except KeyError:
        raise UnusedFieldException(id) from None

data

data(id)
Source code in hypha/apply/funds/models/mixins.py
def data(self, id):
    definitive_id = self.get_definitive_id(id)
    try:
        return self.raw_data[definitive_id]
    except KeyError:
        # We have most likely progressed application forms so the data isn't in form_data
        return None

get_serialize_multi_inputs_answer

get_serialize_multi_inputs_answer(field)
Source code in hypha/apply/funds/models/mixins.py
def get_serialize_multi_inputs_answer(self, field):
    number_of_inputs = field.value.get("number_of_inputs")
    answers = [self.data(field.id + "_" + str(i)) for i in range(number_of_inputs)]
    data = ", ".join(filter(None, answers))
    return data

serialize

serialize(field_id)
Source code in hypha/apply/funds/models/mixins.py
def serialize(self, field_id):
    field = self.field(field_id)
    if isinstance(field.block, MultiInputCharFieldBlock):
        data = self.get_serialize_multi_inputs_answer(field)
    else:
        data = self.data(field_id)
    return field.render(
        context={
            "serialize": True,
            "data": data,
        }
    )

get_multi_inputs_answer

get_multi_inputs_answer(field, include_question=False)
Source code in hypha/apply/funds/models/mixins.py
def get_multi_inputs_answer(self, field, include_question=False):
    number_of_inputs = field.value.get("number_of_inputs")
    answers = [self.data(field.id + "_" + str(i)) for i in range(number_of_inputs)]

    render_data = [
        field.render(
            context={
                "data": answer,
                "include_question": include_question if i == 0 else False,
            }
        )
        for i, answer in enumerate(filter(None, answers))
    ]
    return "".join(render_data).replace("</section>", "") + "</section>"

render_answer

render_answer(field_id, include_question=False)
Source code in hypha/apply/funds/models/mixins.py
def render_answer(self, field_id, include_question=False):
    try:
        field = self.field(field_id)
    except UnusedFieldException:
        return "-"
    if isinstance(field.block, MultiInputCharFieldBlock):
        render_data = self.get_multi_inputs_answer(field, include_question)
        return render_data
    else:
        data = self.data(field_id)
    # Some migrated content have empty address.
    if not data:
        return field.render(
            context={"data": "", "include_question": include_question}
        )
    return field.render(
        context={"data": data, "include_question": include_question}
    )

render_answers

render_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_answers(self):
    # Returns a list of the rendered answers
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.normal_blocks
    ]

render_first_group_text_answers

render_first_group_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_first_group_text_answers(self):
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.first_group_normal_text_blocks
    ]

render_text_blocks_answers

render_text_blocks_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_text_blocks_answers(self):
    # Returns a list of the rendered answers of type text
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.question_text_field_ids
        if field_id not in self.named_blocks
    ]

output_answers

output_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_answers(self):
    # Returns a safe string of the rendered answers
    return mark_safe("".join(self.render_answers()))

output_text_answers

output_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_text_answers(self):
    return mark_safe("".join(self.render_text_blocks_answers()))

output_first_group_text_answers

output_first_group_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_first_group_text_answers(self):
    return mark_safe("".join(self.render_first_group_text_answers()))

get_answer_from_label

get_answer_from_label(label)
Source code in hypha/apply/funds/models/mixins.py
def get_answer_from_label(self, label):
    for field_id in self.question_text_field_ids:
        if field_id not in self.named_blocks:
            question_field = self.serialize(field_id)
            if label.lower() in question_field["question"].lower():
                if isinstance(question_field["answer"], str):
                    answer = question_field["answer"]
                else:
                    answer = ",".join(question_field["answer"])
                if answer and not answer == "N":
                    return answer
    return None

deserialize_form_data classmethod

deserialize_form_data(instance, form_data, form_fields)
Source code in hypha/apply/stream_forms/models.py
@classmethod
def deserialize_form_data(cls, instance, form_data, form_fields):
    data = form_data.copy()
    # PERFORMANCE NOTE:
    # Do not attempt to iterate over form_fields - that will fully instantiate the form_fields
    # including any sub queries that they do
    for _i, field_data in enumerate(form_fields.raw_data):
        block = form_fields.stream_block.child_blocks[field_data["type"]]
        field_id = field_data.get("id")
        try:
            value = data[field_id]
        except KeyError:
            pass
        else:
            data[field_id] = block.decode(value)
    return data

get_defined_fields

get_defined_fields()
Source code in hypha/apply/stream_forms/models.py
def get_defined_fields(self):
    return self.form_fields

get_form_fields

get_form_fields(draft=False, form_data=None, user=None)
Source code in hypha/apply/stream_forms/models.py
def get_form_fields(self, draft=False, form_data=None, user=None):
    if form_data is None:
        form_data = {}

    form_fields = OrderedDict()
    field_blocks = self.get_defined_fields()
    group_counter = 1
    is_in_group = False

    # If true option 1 is selected
    grouped_fields_visible = False
    for struct_child in field_blocks:
        block = struct_child.block
        struct_value = struct_child.value
        if isinstance(block, FormFieldBlock):
            field_from_block = block.get_field(struct_value)
            disabled_help_text = _(
                "You are logged in so this information is fetched from your user account."
            )
            if isinstance(block, FullNameBlock) and user and user.is_authenticated:
                if user.full_name:
                    field_from_block.disabled = True
                    field_from_block.initial = user.full_name
                    field_from_block.help_text = disabled_help_text
                else:
                    field_from_block.help_text = _(
                        "You are logged in but your user account does not have a "
                        "full name. We'll update your user account with the name you provide here."
                    )
            if isinstance(block, EmailBlock) and user and user.is_authenticated:
                field_from_block.disabled = True
                field_from_block.initial = user.email
                field_from_block.help_text = disabled_help_text
            if draft and not issubclass(
                block.__class__, ApplicationMustIncludeFieldBlock
            ):
                field_from_block.required = False
            field_from_block.help_link = struct_value.get("help_link")
            field_from_block.group_number = group_counter if is_in_group else 1
            if isinstance(block, GroupToggleBlock) and not is_in_group:
                field_from_block.group_number = 1
                field_from_block.grouper_for = group_counter + 1
                group_counter += 1
                is_in_group = True
                grouped_fields_visible = (
                    form_data.get(struct_child.id) == field_from_block.choices[0][0]
                )
            if isinstance(block, TextFieldBlock):
                field_from_block.word_limit = struct_value.get("word_limit")
            if isinstance(block, MultiInputCharFieldBlock):
                number_of_inputs = struct_value.get("number_of_inputs")
                for index in range(number_of_inputs):
                    form_fields[struct_child.id + "_" + str(index)] = (
                        field_from_block
                    )
                    field_from_block.multi_input_id = struct_child.id
                    field_from_block.add_button_text = struct_value.get(
                        "add_button_text"
                    )
                    if (
                        index == number_of_inputs - 1
                    ):  # Add button after last input field
                        field_from_block.multi_input_add_button = True
                        # Index for field until which fields will be visible to applicant.
                        # Initially only the first field with id UUID_0 will be visible.
                        field_from_block.visibility_index = 0
                        field_from_block.max_index = index
                    if index != 0:
                        field_from_block.multi_input_field = True
                        field_from_block.required = False
                        field_from_block.initial = None
                    field_from_block = copy.copy(field_from_block)
            else:
                if is_in_group and not isinstance(block, GroupToggleBlock):
                    field_from_block.required_when_visible = (
                        field_from_block.required
                    )
                    field_from_block.required = (
                        field_from_block.required & grouped_fields_visible
                    )
                    field_from_block.visible = grouped_fields_visible
                form_fields[struct_child.id] = field_from_block
        elif isinstance(block, GroupToggleEndBlock):
            # Group toggle end block is used only to group fields and not used in actual form.
            # Todo: Use streamblock to create nested form field blocks, a more elegant method to group form fields.
            is_in_group = False
        else:
            field_wrapper = BlockFieldWrapper(struct_child)
            field_wrapper.group_number = group_counter if is_in_group else 1
            form_fields[struct_child.id] = field_wrapper

    return form_fields

get_form_class

get_form_class(draft=False, form_data=None, user=None)
Source code in hypha/apply/stream_forms/models.py
def get_form_class(self, draft=False, form_data=None, user=None):
    return type(
        "WagtailStreamForm",
        (self.submission_form_class,),
        self.get_form_fields(draft, form_data, user),
    )

not_progressed

not_progressed()
Source code in hypha/apply/funds/models/submissions.py
def not_progressed(self):
    return not self.next

restart_stage

restart_stage(**kwargs)

If running form the console please include your user using the kwarg "by"

u = User.objects.get(email="my@email.com") for a in ApplicationSubmission.objects.all(): a.restart_stage(by=u) a.save()

Source code in hypha/apply/funds/models/submissions.py
@transition(
    status,
    source="*",
    target=RETURN_VALUE(INITIAL_STATE, "draft_proposal", "invited_to_proposal"),
    permission=make_permission_check({UserPermissions.ADMIN}),
)
def restart_stage(self, **kwargs):
    """
    If running form the console please include your user using the kwarg "by"

    u = User.objects.get(email="<[email protected]>")
    for a in ApplicationSubmission.objects.all():
        a.restart_stage(by=u)
        a.save()
    """
    if hasattr(self, "previous"):
        return "draft_proposal"
    elif self.next:
        return "invited_to_proposal"
    return INITIAL_STATE

ensure_user_has_account

ensure_user_has_account()
Source code in hypha/apply/funds/models/submissions.py
def ensure_user_has_account(self):
    if self.user and self.user.is_authenticated:
        self.form_data["email"] = self.user.email
        if name := self.user.get_full_name():
            self.form_data["full_name"] = name
        else:
            # user doesn't have name set, so use the one from the form
            self.user.full_name = self.form_data["full_name"]
            self.user.save()
    else:
        # Rely on the form having the following must include fields (see blocks.py)
        email = self.form_data.get("email")
        full_name = self.form_data.get("full_name")

        User = get_user_model()
        if "skip_account_creation_notification" in self.form_data:
            self.form_data.pop("skip_account_creation_notification", None)
            self.user, _ = User.objects.get_or_create(
                email=email, defaults={"full_name": full_name}
            )
        else:
            self.user, _ = User.objects.get_or_create_and_notify(
                email=email,
                site=self.page.get_site(),
                defaults={"full_name": full_name},
            )

    # Make sure the user is in the applicant group
    if not self.user.is_applicant:
        applicant_group = Group.objects.get(name=APPLICANT_GROUP_NAME)
        self.user.groups.add(applicant_group)
        self.user.save()

get_from_parent

get_from_parent(attribute)
Source code in hypha/apply/funds/models/submissions.py
def get_from_parent(self, attribute):
    try:
        return getattr(self.round.specific, attribute)
    except AttributeError:
        # We are a lab submission
        return getattr(self.page.specific, attribute)

progress_application

progress_application(**kwargs)
Source code in hypha/apply/funds/models/submissions.py
def progress_application(self, **kwargs):
    target = None
    for phase in STAGE_CHANGE_ACTIONS:
        transition = self.get_transition(phase)
        if can_proceed(transition):
            # We convert to dict as not concerned about transitions from the first phase
            # See note in workflow.py
            target = dict(PHASES)[phase].stage
    if not target:
        raise ValueError("Incorrect State for transition")

    submission_in_db = ApplicationSubmission.objects.get(id=self.id)
    prev_meta_terms = submission_in_db.meta_terms.all()

    self.id = None
    self.public_id = None
    proposal_form = kwargs.get("proposal_form")
    proposal_form = int(proposal_form) if proposal_form else 0
    self.form_fields = self.get_from_parent("get_defined_fields")(
        target, form_index=proposal_form
    )

    self.live_revision = None
    self.draft_revision = None
    self.save()
    self.meta_terms.set(prev_meta_terms)

    submission_in_db.next = self
    submission_in_db.save()

from_draft

from_draft()

Sets current form_data to the form_data from the draft revision.

Returns:

  • Self –

    Self with the form_data attribute updated.

Source code in hypha/apply/funds/models/submissions.py
def from_draft(self) -> Self:
    """Sets current `form_data` to the `form_data` from the draft revision.

    Returns:
        Self with the `form_data` attribute updated.
    """
    self.form_data = self.deserialised_data(
        self, self.draft_revision.form_data, self.form_fields
    )

    return self

create_revision

create_revision(draft=False, force=False, by=None, **kwargs)

Create a new revision on the submission

This is used to save drafts, track changes when an RFI is made and save changes before rendering a preview

Parameters:

  • draft –

    if the revision is a draft

  • force –

    force a revision even if form data is the same

  • by (Optional[AnonymousUser | AbstractBaseUser], default: None ) –

    the author of the revision

  • preview –

    if the revision is being used to save befor a preview

Returns:

  • Optional[Model] –

    Returns the ApplicationRevision if it was created, otherwise returns None

Source code in hypha/apply/funds/models/submissions.py
def create_revision(
    self,
    draft=False,
    force=False,
    by: Optional[AnonymousUser | AbstractBaseUser] = None,
    **kwargs,
) -> Optional[models.Model]:
    """Create a new revision on the submission

    This is used to save drafts, track changes when an RFI is made and
    save changes before rendering a preview

    Args:
        draft: if the revision is a draft
        force: force a revision even if form data is the same
        by: the author of the revision
        preview: if the revision is being used to save befor a preview

    Returns:
        Returns the [`ApplicationRevision`][hypha.apply.funds.models.ApplicationRevision] if it was created, otherwise returns `None`
    """
    ApplicationRevision = apps.get_model("funds", "ApplicationRevision")
    self.clean_submission()
    current_submission = ApplicationSubmission.objects.get(id=self.id)
    current_data = current_submission.form_data
    if current_data != self.form_data or force:
        if self.live_revision == self.draft_revision:
            revision = ApplicationRevision.objects.create(
                submission=self,
                form_data=self.form_data,
                author=by,
                is_draft=draft,
            )
        else:
            revision = self.draft_revision
            revision.form_data = self.form_data
            revision.author = by
            revision.save()

        if draft:
            self.form_data = current_submission.form_data
        else:
            # Move the revision state out of draft as it is being submitted
            if revision.is_draft:
                revision.is_draft = False
                revision.save()
            self.live_revision = revision
            self.search_data = " ".join(list(self.prepare_search_values()))
            self.search_document = self.prepare_search_vector()

        self.draft_revision = revision
        self.save(skip_custom=True)
        return revision
    else:
        revision = self.draft_revision

        # Utilized when the user has previously saved a draft,
        # then doesn't edit the draft but submits it straight
        # from the edit view
        if not draft and revision.is_draft:
            revision.is_draft = False
            revision.save()
            self.live_revision = revision
            self.search_data = " ".join(list(self.prepare_search_values()))
            self.search_document = self.prepare_search_vector()
            self.save(skip_custom=True)

            return revision

    return None

clean_submission

clean_submission()
Source code in hypha/apply/funds/models/submissions.py
def clean_submission(self):
    self.process_form_data()
    self.ensure_user_has_account()
    self.process_file_data(self.form_data)

get_assigned_meta_terms

get_assigned_meta_terms()

Returns assigned meta terms excluding the 'root' term

Source code in hypha/apply/funds/models/submissions.py
def get_assigned_meta_terms(self):
    """Returns assigned meta terms excluding the 'root' term"""
    return self.meta_terms.exclude(depth=1)

process_form_data

process_form_data()
Source code in hypha/apply/funds/models/submissions.py
def process_form_data(self):
    for field_name, field_id in self.named_blocks.items():
        response = self.form_data.pop(field_id, None)
        if response:
            self.form_data[field_name] = response

save

save(*args, update_fields=None, skip_custom=False, **kwargs)
Source code in hypha/apply/funds/models/submissions.py
def save(self, *args, update_fields=None, skip_custom=False, **kwargs):
    if update_fields is None:
        update_fields = []
    if update_fields and "form_data" not in update_fields:
        # We don't want to use this approach if the user is sending data
        return super().save(*args, update_fields=update_fields, **kwargs)
    elif skip_custom:
        return super().save(*args, **kwargs)

    creating = not self.id

    if creating:
        self.submit_time = timezone.now()
        # We are creating the object default to first stage
        self.workflow_name = self.get_from_parent("workflow_name")
        # Copy extra relevant information to the child
        self.lead = self.get_from_parent("lead")

        # We need the submission id to correctly save the files
        files = self.extract_files()

    self.clean_submission()

    # add a denormed version of the answer for searching
    # @TODO: remove 'search_data' in favour of 'search_document' for FTS
    self.search_data = " ".join(self.prepare_search_values())
    self.search_document = self.prepare_search_vector()

    super().save(*args, **kwargs)

    # TODO: This functionality should be extracted and moved to a separate function, too hidden here
    if creating:
        AssignedReviewers = apps.get_model("funds", "AssignedReviewers")
        ApplicationRevision = apps.get_model("funds", "ApplicationRevision")

        if not self.public_id:
            self.public_id = (
                f"{self.get_from_parent('submission_id_prefix')}{self.id}"
            )

        self.process_file_data(files)
        AssignedReviewers.objects.bulk_create_reviewers(
            list(self.get_from_parent("reviewers").all()),
            self,
        )
        # TODO: This functionality should be implemented into `ApplicationSubmission.create_revision`
        first_revision = ApplicationRevision.objects.create(
            submission=self,
            form_data=self.form_data,
            author=self.user,
            is_draft=self.is_draft,
        )
        self.live_revision = first_revision
        self.draft_revision = first_revision
        self.save()

reviewed_by

reviewed_by(user)
Source code in hypha/apply/funds/models/submissions.py
def reviewed_by(self, user):
    return self.assigned.reviewed().filter(reviewer=user).exists()

flagged_by

flagged_by(user)
Source code in hypha/apply/funds/models/submissions.py
def flagged_by(self, user):
    return self.flags.filter(user=user, type=Flag.USER).exists()

has_permission_to_review

has_permission_to_review(user)
Source code in hypha/apply/funds/models/submissions.py
def has_permission_to_review(self, user):
    if user.is_apply_staff:
        return True

    if user in self.reviewers_not_reviewed:
        return True

    if (
        user.is_community_reviewer
        and self.user != user
        and self.community_review
        and not self.reviewed_by(user)
    ):
        return True

    return False

can_review

can_review(user)
Source code in hypha/apply/funds/models/submissions.py
def can_review(self, user):
    if self.reviewed_by(user):
        return False

    return self.has_permission_to_review(user)

can_view_draft

can_view_draft(user)
Source code in hypha/apply/funds/models/submissions.py
def can_view_draft(self, user):
    if self.user == user:
        return True

    if user.is_apply_staff and settings.SUBMISSIONS_DRAFT_ACCESS_STAFF:
        return True

    if user.is_apply_staff_admin and settings.SUBMISSIONS_DRAFT_ACCESS_STAFF_ADMIN:
        return True

    return False

get_searchable_contents

get_searchable_contents()
Source code in hypha/apply/funds/models/submissions.py
def get_searchable_contents(self):
    contents = []
    for field_id in self.question_field_ids:
        field = self.field(field_id)
        data = self.data(field_id)
        value = field.block.get_searchable_content(field.value, data)
        if value:
            if isinstance(value, list):
                contents.append(", ".join(value))
            else:
                contents.append(value)
    return contents

prepare_search_values

prepare_search_values()
Source code in hypha/apply/funds/models/submissions.py
def prepare_search_values(self):
    values = self.get_searchable_contents()

    # Add named fields into the search index
    for field in ["full_name", "email", "title", "public_id"]:
        if value := getattr(self, field):
            values.append(value)
    return values

index_components

index_components()
Source code in hypha/apply/funds/models/submissions.py
def index_components(self):
    return {
        "A": " ".join([f"id:{self.public_id or self.id}", self.title]),
        "C": " ".join([self.full_name, self.email]),
        "B": " ".join(self.get_searchable_contents()),
    }

prepare_search_vector

prepare_search_vector()
Source code in hypha/apply/funds/models/submissions.py
def prepare_search_vector(self):
    search_vectors = []
    for weight, text in self.index_components().items():
        search_vectors.append(SearchVector(Value(text), weight=weight))
    return reduce(operator.add, search_vectors)

get_absolute_url

get_absolute_url()
Source code in hypha/apply/funds/models/submissions.py
def get_absolute_url(self):
    return reverse("funds:submissions:detail", args=(self.id,))

get_data

get_data()
Source code in hypha/apply/funds/models/submissions.py
def get_data(self):
    # Updated for JSONField - Not used but base get_data will error
    form_data = self.form_data.copy()
    form_data.update(
        {
            "submit_time": self.submit_time,
        }
    )

    return form_data

get_current_screening_status

get_current_screening_status()
Source code in hypha/apply/funds/models/submissions.py
def get_current_screening_status(self):
    return self.screening_statuses.first()

get_yes_screening_status

get_yes_screening_status()
Source code in hypha/apply/funds/models/submissions.py
def get_yes_screening_status(self):
    return self.screening_statuses.filter(yes=True).exists()

get_no_screening_status

get_no_screening_status()
Source code in hypha/apply/funds/models/submissions.py
def get_no_screening_status(self):
    return self.screening_statuses.filter(yes=False).first()

make_permission_check

make_permission_check(users)
Source code in hypha/apply/funds/models/submissions.py
def make_permission_check(users):
    def can_transition(instance, user):
        if UserPermissions.STAFF in users and user.is_apply_staff:
            return True
        if UserPermissions.ADMIN in users and user.is_superuser:
            return True
        if UserPermissions.LEAD in users and instance.lead == user:
            return True
        if UserPermissions.APPLICANT in users and instance.user == user:
            return True
        return False

    return can_transition

wrap_method

wrap_method(func)
Source code in hypha/apply/funds/models/submissions.py
def wrap_method(func):
    def wrapped(*args, **kwargs):
        # Provides a new function that can be wrapped with the django_fsm method
        # Without this using the same method for multiple transitions fails as
        # the fsm wrapping is overwritten
        return func(*args, **kwargs)

    return wrapped

transition_id

transition_id(target, phase)
Source code in hypha/apply/funds/models/submissions.py
def transition_id(target, phase):
    transition_prefix = "transition"
    return "__".join([transition_prefix, phase.stage.name.lower(), phase.name, target])

log_status_update

log_status_update(sender, **kwargs)
Source code in hypha/apply/funds/models/submissions.py
@receiver(post_transition, sender=ApplicationSubmission)
def log_status_update(sender, **kwargs):
    instance = kwargs["instance"]
    old_phase = instance.workflow[kwargs["source"]]

    by = kwargs["method_kwargs"]["by"]
    request = kwargs["method_kwargs"]["request"]
    notify = kwargs["method_kwargs"].get("notify", True)

    if request and notify:
        if kwargs["source"] == DRAFT_STATE:
            # remove task from applicant dashboard for this instance
            remove_tasks_for_user(code=SUBMISSION_DRAFT, user=by, related_obj=instance)
            # notify for a new submission
            messenger(
                MESSAGES.NEW_SUBMISSION,
                request=request,
                user=by,
                source=instance,
            )
        else:
            messenger(
                MESSAGES.TRANSITION,
                user=by,
                request=request,
                source=instance,
                related=old_phase,
            )

        if instance.status in review_statuses:
            messenger(
                MESSAGES.READY_FOR_REVIEW,
                user=by,
                request=request,
                source=instance,
            )

    if instance.status in STAGE_CHANGE_ACTIONS:
        messenger(
            MESSAGES.INVITED_TO_PROPOSAL,
            request=request,
            user=by,
            source=instance,
        )