|
- from redbot.core import commands, checks, Config
- import discord
- import asyncio
- import requests
- import xml.etree.cElementTree as et
- import time
- import datetime
- import gzip
-
- R_HEADERS = {'User-Agent': '<Nation: Haku>'}
- RATE_LIMIT = 0.7
-
-
- def canonicalize(in_str):
- return in_str.lower().replace(' ', '_')
-
-
- def get_founderless_regions():
- """Return the list of founderless regions."""
- r = requests.get('https://www.nationstates.net/cgi-bin/api.cgi?q=regionsbytag;tags=founderless',
- headers=R_HEADERS)
- time.sleep(RATE_LIMIT)
- tree = et.fromstring(r.text)
- return tree[0].text.split(',')
-
-
- def download_region_dump():
- """Download the latest region dump from the NS API."""
- r = requests.get('https://www.nationstates.net/pages/regions.xml.gz',
- headers=R_HEADERS)
- time.sleep(RATE_LIMIT)
- with open('regions.xml.gz', 'wb') as f:
- f.write(r.content)
- with gzip.open('regions.xml.gz', 'rb') as f_in:
- with open('regions.xml', 'wb') as f_out:
- f_out.write(f_in.read())
-
-
- def get_region_endos(regions):
- """Using the region dump, return a Dict with the number of endos for each region."""
- tree = et.parse('regions.xml')
- endo_dict = dict()
- root = tree.getroot()
- for region in root:
- if region[0].text in regions:
- endo_dict[region[0].text] = int(region[5].text) - 1
- return endo_dict
-
-
- class FounderlessNotify(commands.Cog):
- """Notifies when a region changes in founderless status."""
-
- def __init__(self, bot, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.bot = bot
- self.config = Config.get_conf(self, identifier=82732344, force_registration=True)
- default_global_settings = {'update_time': 0, 'previous_founderless': [], 'notify_channel': 0}
- self.config.register_global(**default_global_settings)
- self.bg_loop_task = asyncio.create_task(self.bg_loop())
-
- def cog_unload(self):
- if self.bg_loop_task:
- self.bg_loop_task.cancel()
-
- @commands.command()
- @checks.is_owner()
- async def is_task_running(self, ctx):
- """Check if the main loop is running."""
- if self.bg_loop_task:
- await ctx.send('True')
- else:
- await ctx.send('False')
-
- @commands.command()
- @checks.is_owner()
- async def start_task(self, ctx):
- """Start the main loop if it is not running."""
- if self.bg_loop_task:
- await ctx.send('Task is already running')
- else:
- self.bg_loop_task = asyncio.create_task(self.bg_loop())
-
- @commands.command()
- @checks.is_owner()
- async def force_check(self, ctx, update_type):
- """Force perform a check in the differences of founderless regions."""
- if update_type.lower() == 'major' or update_type.lower() == 'minor':
- await self.update_founderless(update_type)
- else:
- return
-
- async def update_founderless(self, update_type):
- """Check for the differences in the founderless regions and then output that to a channel."""
- channel_id = await self.config.notify_channel()
- channel = self.bot.get_channel(channel_id)
- await channel.send(f'Beginning {update_type} check...')
- new_founderless_regions = get_founderless_regions()
- previous_founderless_regions = await self.config.previous_founderless()
- # A region is now foundered if it was in the previous list, but isn't in the current list
- # now_foundered = [region for region in previous_founderless_regions if (region not in new_founderless_regions)]
- # A region is now founderless if it is in the current list, but wasn't in the previous list
- now_founderless = [region for region in new_founderless_regions if (region not in previous_founderless_regions)]
- # Get the region endos for both lists
- # now_foundered_endos = get_region_endos(now_foundered)
- now_founderless_endos = get_region_endos(now_founderless)
- message_content = ''
- await channel.send('The following regions are now **Founderless**:')
- for region in now_founderless:
- if len(message_content) <= 1800:
- message_content += f'https://www.nationstates.net/region={canonicalize(region)} ' \
- f'({now_founderless_endos[region]}e)\n'
- else:
- await channel.send(message_content)
- message_content = ""
- message_content += f'https://www.nationstates.net/region={canonicalize(region)} ' \
- f'({now_founderless_endos[region]}e)\n'
- # Send the remaining contents just in case
- if message_content:
- await channel.send(message_content)
-
- await self.config.previous_founderless.set(new_founderless_regions)
-
- async def bg_loop(self):
- """Main background loop."""
- while True:
- # Only check once every 10 minutes
- await asyncio.sleep(600)
- current_time = datetime.datetime.utcnow()
- major_time = await self.config.update_time()
- minor_time = major_time + 12
- # Major Update
- if current_time.hour == (major_time + 2):
- download_region_dump()
- await self.update_founderless('major')
- await asyncio.sleep(3600)
- # Minor Update
- elif current_time.hour == (minor_time + 1):
- await self.update_founderless('minor')
- await asyncio.sleep(3600)
-
- @commands.command()
- @checks.is_owner()
- async def update_channel(self, ctx):
- """Set the notification channel."""
- await self.config.notify_channel.set(ctx.channel.id)
- await ctx.send(f'Founderless notify channel set to {ctx.channel.mention}')
-
- @commands.command()
- @checks.is_owner()
- async def update_time(self, ctx, update):
- """Set the update time."""
- update = int(update)
- await self.config.update_time.set(update)
- await ctx.send(f'Major update set to: {await self.config.update_time()}:00 UTC'
- f'\nMinor update set to: {await self.config.update_time() + 12}:00 UTC')
-
- @commands.command()
- @checks.is_owner()
- async def get_settings(self, ctx):
- """Get the current settings from config."""
- stored_channel = await self.config.notify_channel()
- major_time = await self.config.update_time()
- minor_time = major_time + 12
- founderless = await self.config.previous_founderless()
- channel = ctx.guild.get_channel(stored_channel)
- await ctx.send(f'Current Channel: {channel.mention}\nCurrent Major Update Time: {major_time}:00 UTC\n'
- f'Current Minor Update Time: {minor_time}:00 UTC\nCurrent # Founderless: {len(founderless)}')
-
- @commands.command()
- @checks.is_owner()
- async def force_update_founderless(self, ctx):
- """Manually update the founderless region list."""
- founderless_regions = get_founderless_regions()
- await self.config.previous_founderless.set(founderless_regions)
- await ctx.send(f'Manually updated founderless regions list. There are now '
- f'{len(founderless_regions)} founderless regions.')
-
- @commands.command()
- @checks.is_owner()
- async def force_download_dump(self, ctx):
- """Manually download the region dump."""
- await ctx.send('Downloading region dump...')
- download_region_dump()
- await ctx.send('Region dump successfully updated.')
|