|
- from redbot.core import commands, checks, Config
- from redbot.core.utils.chat_formatting import pagify
- import discord
- import asyncio
- import requests
- import xml.etree.cElementTree as et
- import time
- import datetime
- import gzip
-
- R_HEADERS = {'User-Agent': '<Nation: Haku>'}
- RATE_LIMIT = 0.7
-
-
- def canonicalize(in_str):
- return in_str.lower().replace(' ', '_')
-
-
- def get_founderless_regions():
- """Return the list of founderless regions."""
- time.sleep(RATE_LIMIT)
- r = requests.get('https://www.nationstates.net/cgi-bin/api.cgi?q=regionsbytag;tags=founderless',
- headers=R_HEADERS)
- tree = et.fromstring(r.text)
- return [canonicalize(region) for region in tree[0].text.split(',')]
-
-
- def download_region_dump():
- """Download the latest region dump from the NS API."""
- time.sleep(RATE_LIMIT)
- r = requests.get('https://www.nationstates.net/pages/regions.xml.gz',
- headers=R_HEADERS)
- with open('regions.xml.gz', 'wb') as f:
- f.write(r.content)
- with gzip.open('regions.xml.gz', 'rb') as f_in:
- with open('regions.xml', 'wb') as f_out:
- f_out.write(f_in.read())
-
-
- def get_region_endos(regions):
- """Using the region dump, return a Dict with the number of endos for each region."""
- tree = et.parse('regions.xml')
- endo_dict = dict()
- root = tree.getroot()
- for region in root:
- if region[0].text in regions:
- region_name = canonicalize(region[0].text)
- endo_dict[region_name] = int(region[5].text) - 1
- return endo_dict
-
-
- class FounderlessNotify(commands.Cog):
- """Notifies when a region changes in founderless status."""
-
- def __init__(self, bot, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.bot = bot
- self.config = Config.get_conf(self, identifier=82732344, force_registration=True)
- default_global_settings = {'update_time': 0, 'previous_founderless': [], 'notify_channel': 0}
- self.config.register_global(**default_global_settings)
- self.bg_loop_task = asyncio.create_task(self.bg_loop())
-
- def cog_unload(self):
- if self.bg_loop_task:
- self.bg_loop_task.cancel()
-
- @commands.command()
- @checks.is_owner()
- async def is_task_running(self, ctx):
- """Check if the main loop is running."""
- if self.bg_loop_task:
- await ctx.send('True')
- else:
- await ctx.send('False')
-
- @commands.command()
- @checks.is_owner()
- async def start_task(self, ctx):
- """Start the main loop if it is not running."""
- if self.bg_loop_task:
- await ctx.send('Task is already running')
- else:
- self.bg_loop_task = asyncio.create_task(self.bg_loop())
-
- @commands.command()
- @checks.is_owner()
- async def force_check(self, ctx, update_type):
- """Force perform a check in the differences of founderless regions."""
- if update_type.lower() == 'major' or update_type.lower() == 'minor':
- await self.update_founderless(update_type)
- else:
- return
-
- async def update_founderless(self, update_type):
- """Check for the differences in the founderless regions and then output that to a channel."""
- channel_id = await self.config.notify_channel()
- channel = self.bot.get_channel(channel_id)
- await channel.send(f'Beginning {update_type} check...')
- new_founderless_regions = get_founderless_regions()
- previous_founderless_regions = await self.config.previous_founderless()
- # A region is now founderless if it is in the current list, but wasn't in the previous list
- now_founderless = [region for region in new_founderless_regions if (region not in previous_founderless_regions)]
- # Get the region endos for both lists
- now_founderless_endos = get_region_endos(now_founderless)
- # Create an ordered set with each region's name and delegate endorsement level
- out = []
- for region in now_founderless:
- try:
- region_endos = now_founderless_endos[region]
- except KeyError:
- region_endos = -1
- out.append((region_endos, region))
- # Sort the output by delegate endos in descending order
- out.sort()
- out.reverse()
- # Prep output
- message_content = "The following regions are now **Founderless**:\n"
- for region in out:
- message_content += f"https://www.nationstates.net/region={region[1]} ({region[0]})\n"
- for page in pagify(message_content):
- await channel.send(page)
- await self.config.previous_founderless.set(new_founderless_regions)
-
- async def bg_loop(self):
- """Main background loop."""
- while True:
- # Only check once every 5 minutes
- await asyncio.sleep(300)
- current_time = datetime.datetime.utcnow()
- major_time = await self.config.update_time()
- minor_time = major_time + 12
- # Major Update
- if current_time.hour == (major_time + 2):
- download_region_dump()
- await self.update_founderless('major')
- await asyncio.sleep(3600)
- # Minor Update
- elif current_time.hour == (minor_time + 1):
- await self.update_founderless('minor')
- await asyncio.sleep(3600)
-
- @commands.command()
- @checks.is_owner()
- async def update_channel(self, ctx):
- """Set the notification channel."""
- await self.config.notify_channel.set(ctx.channel.id)
- await ctx.send(f'Founderless notify channel set to {ctx.channel.mention}')
-
- @commands.command()
- @checks.is_owner()
- async def update_time(self, ctx, update):
- """Set the update time."""
- update = int(update)
- await self.config.update_time.set(update)
- await ctx.send(f'Major update set to: {await self.config.update_time()}:00 UTC'
- f'\nMinor update set to: {await self.config.update_time() + 12}:00 UTC')
-
- @commands.command()
- @checks.is_owner()
- async def get_settings(self, ctx):
- """Get the current settings from config."""
- stored_channel = await self.config.notify_channel()
- major_time = await self.config.update_time()
- minor_time = major_time + 12
- founderless = await self.config.previous_founderless()
- channel = ctx.guild.get_channel(stored_channel)
- await ctx.send(f'Current Channel: {channel.mention}\nCurrent Major Update Time: {major_time}:00 UTC\n'
- f'Current Minor Update Time: {minor_time}:00 UTC\nCurrent # Founderless: {len(founderless)}')
-
- @commands.command()
- @checks.is_owner()
- async def force_update_founderless(self, ctx):
- """Manually update the founderless region list."""
- founderless_regions = get_founderless_regions()
- await self.config.previous_founderless.set(founderless_regions)
- await ctx.send(f'Manually updated founderless regions list. There are now '
- f'{len(founderless_regions)} founderless regions.')
-
- @commands.command()
- @checks.is_owner()
- async def force_download_dump(self, ctx):
- """Manually download the region dump."""
- await ctx.send('Downloading region dump...')
- download_region_dump()
- await ctx.send('Region dump successfully updated.')
|