class MyConfigFlow(ConfigFlow, domain=DOMAIN):
"""My config flow."""
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfiguration of the integration."""
errors: dict[str, str] = {}
if user_input:
client = MyClient(user_input[CONF_HOST], user_input[CONF_API_TOKEN])
try:
user_id = await client.check_connection()
except MyException as exception:
errors["base"] = "cannot_connect"
else:
await self.async_set_unique_id(user_id)
self._abort_if_unique_id_mismatch(reason="wrong_account")
return self.async_update_reload_and_abort(
self._get_reconfigure_entry(),
data_updates=user_input,
)
return self.async_show_form(
step_id="reconfigure",
data_schema=vol.Schema(
{
vol.Required(CONF_HOST): TextSelector(),
vol.Required(CONF_API_TOKEN): TextSelector(),
}
),
errors=errors,
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors: dict[str, str] = {}
if user_input:
client = MyClient(user_input[CONF_HOST], user_input[CONF_API_TOKEN])
try:
user_id = await client.check_connection()
except MyException as exception:
errors["base"] = "cannot_connect"
else:
await self.async_set_unique_id(user_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title="MyIntegration",
data=user_input,
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HOST): TextSelector(),
vol.Required(CONF_API_TOKEN): TextSelector(),
}
),
errors=errors,
)